5c4d743cb3c9b0274cd23c86037c546a0e3c0009
[platform/framework/web/crosswalk.git] / src / ipc / ipc_channel_proxy.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_channel_proxy.h"
6
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_listener.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_macros.h"
17 #include "ipc/message_filter.h"
18 #include "ipc/message_filter_router.h"
19
20 namespace IPC {
21
22 //------------------------------------------------------------------------------
23
24 ChannelProxy::Context::Context(Listener* listener,
25                                base::SingleThreadTaskRunner* ipc_task_runner)
26     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
27       listener_(listener),
28       ipc_task_runner_(ipc_task_runner),
29       channel_connected_called_(false),
30       message_filter_router_(new MessageFilterRouter()),
31       peer_pid_(base::kNullProcessId) {
32   DCHECK(ipc_task_runner_.get());
33   // The Listener thread where Messages are handled must be a separate thread
34   // to avoid oversubscribing the IO thread. If you trigger this error, you
35   // need to either:
36   // 1) Create the ChannelProxy on a different thread, or
37   // 2) Just use Channel
38   // Note, we currently make an exception for a NULL listener. That usage
39   // basically works, but is outside the intent of ChannelProxy. This support
40   // will disappear, so please don't rely on it. See crbug.com/364241
41   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
42 }
43
44 ChannelProxy::Context::~Context() {
45 }
46
47 void ChannelProxy::Context::ClearIPCTaskRunner() {
48   ipc_task_runner_ = NULL;
49 }
50
51 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
52                                           const Channel::Mode& mode) {
53   DCHECK(!channel_);
54   channel_id_ = handle.name;
55   channel_.reset(new Channel(handle, mode, this));
56 }
57
58 bool ChannelProxy::Context::TryFilters(const Message& message) {
59   DCHECK(message_filter_router_);
60 #ifdef IPC_MESSAGE_LOG_ENABLED
61   Logging* logger = Logging::GetInstance();
62   if (logger->Enabled())
63     logger->OnPreDispatchMessage(message);
64 #endif
65
66   if (message_filter_router_->TryFilters(message)) {
67 #ifdef IPC_MESSAGE_LOG_ENABLED
68     if (logger->Enabled())
69       logger->OnPostDispatchMessage(message, channel_id_);
70 #endif
71     return true;
72   }
73   return false;
74 }
75
76 // Called on the IPC::Channel thread
77 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
78   // First give a chance to the filters to process this message.
79   if (!TryFilters(message))
80     OnMessageReceivedNoFilter(message);
81   return true;
82 }
83
84 // Called on the IPC::Channel thread
85 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
86   listener_task_runner_->PostTask(
87       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
88   return true;
89 }
90
91 // Called on the IPC::Channel thread
92 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
93   // We cache off the peer_pid so it can be safely accessed from both threads.
94   peer_pid_ = channel_->peer_pid();
95
96   // Add any pending filters.  This avoids a race condition where someone
97   // creates a ChannelProxy, calls AddFilter, and then right after starts the
98   // peer process.  The IO thread could receive a message before the task to add
99   // the filter is run on the IO thread.
100   OnAddFilter();
101
102   // See above comment about using listener_task_runner_ here.
103   listener_task_runner_->PostTask(
104       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
105 }
106
107 // Called on the IPC::Channel thread
108 void ChannelProxy::Context::OnChannelError() {
109   for (size_t i = 0; i < filters_.size(); ++i)
110     filters_[i]->OnChannelError();
111
112   // See above comment about using listener_task_runner_ here.
113   listener_task_runner_->PostTask(
114       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
115 }
116
117 // Called on the IPC::Channel thread
118 void ChannelProxy::Context::OnChannelOpened() {
119   DCHECK(channel_ != NULL);
120
121   // Assume a reference to ourselves on behalf of this thread.  This reference
122   // will be released when we are closed.
123   AddRef();
124
125   if (!channel_->Connect()) {
126     OnChannelError();
127     return;
128   }
129
130   for (size_t i = 0; i < filters_.size(); ++i)
131     filters_[i]->OnFilterAdded(channel_.get());
132 }
133
134 // Called on the IPC::Channel thread
135 void ChannelProxy::Context::OnChannelClosed() {
136   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
137   // would result in this branch being taken.
138   if (!channel_)
139     return;
140
141   for (size_t i = 0; i < filters_.size(); ++i) {
142     filters_[i]->OnChannelClosing();
143     filters_[i]->OnFilterRemoved();
144   }
145
146   // We don't need the filters anymore.
147   message_filter_router_->Clear();
148   filters_.clear();
149   // We don't need the lock, because at this point, the listener thread can't
150   // access it any more.
151   pending_filters_.clear();
152
153   channel_.reset();
154
155   // Balance with the reference taken during startup.  This may result in
156   // self-destruction.
157   Release();
158 }
159
160 void ChannelProxy::Context::Clear() {
161   listener_ = NULL;
162 }
163
164 // Called on the IPC::Channel thread
165 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
166   if (!channel_) {
167     OnChannelClosed();
168     return;
169   }
170
171   if (!channel_->Send(message.release()))
172     OnChannelError();
173 }
174
175 // Called on the IPC::Channel thread
176 void ChannelProxy::Context::OnAddFilter() {
177   // Our OnChannelConnected method has not yet been called, so we can't be
178   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
179   // it invokes OnAddFilter, so any pending filter(s) will be added at that
180   // time.
181   if (peer_pid_ == base::kNullProcessId)
182     return;
183
184   std::vector<scoped_refptr<MessageFilter> > new_filters;
185   {
186     base::AutoLock auto_lock(pending_filters_lock_);
187     new_filters.swap(pending_filters_);
188   }
189
190   for (size_t i = 0; i < new_filters.size(); ++i) {
191     filters_.push_back(new_filters[i]);
192
193     message_filter_router_->AddFilter(new_filters[i].get());
194
195     // The channel has already been created and connected, so we need to
196     // inform the filters right now.
197     new_filters[i]->OnFilterAdded(channel_.get());
198     new_filters[i]->OnChannelConnected(peer_pid_);
199   }
200 }
201
202 // Called on the IPC::Channel thread
203 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
204   if (peer_pid_ == base::kNullProcessId) {
205     // The channel is not yet connected, so any filters are still pending.
206     base::AutoLock auto_lock(pending_filters_lock_);
207     for (size_t i = 0; i < pending_filters_.size(); ++i) {
208       if (pending_filters_[i].get() == filter) {
209         filter->OnFilterRemoved();
210         pending_filters_.erase(pending_filters_.begin() + i);
211         return;
212       }
213     }
214     return;
215   }
216   if (!channel_)
217     return;  // The filters have already been deleted.
218
219   message_filter_router_->RemoveFilter(filter);
220
221   for (size_t i = 0; i < filters_.size(); ++i) {
222     if (filters_[i].get() == filter) {
223       filter->OnFilterRemoved();
224       filters_.erase(filters_.begin() + i);
225       return;
226     }
227   }
228
229   NOTREACHED() << "filter to be removed not found";
230 }
231
232 // Called on the listener's thread
233 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
234   base::AutoLock auto_lock(pending_filters_lock_);
235   pending_filters_.push_back(make_scoped_refptr(filter));
236   ipc_task_runner_->PostTask(
237       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
238 }
239
240 // Called on the listener's thread
241 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
242 #ifdef IPC_MESSAGE_LOG_ENABLED
243   Logging* logger = Logging::GetInstance();
244   std::string name;
245   logger->GetMessageText(message.type(), &name, &message, NULL);
246   TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
247                "name", name);
248 #else
249   TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
250                "class", IPC_MESSAGE_ID_CLASS(message.type()),
251                "line", IPC_MESSAGE_ID_LINE(message.type()));
252 #endif
253
254   if (!listener_)
255     return;
256
257   OnDispatchConnected();
258
259 #ifdef IPC_MESSAGE_LOG_ENABLED
260   if (message.type() == IPC_LOGGING_ID) {
261     logger->OnReceivedLoggingMessage(message);
262     return;
263   }
264
265   if (logger->Enabled())
266     logger->OnPreDispatchMessage(message);
267 #endif
268
269   listener_->OnMessageReceived(message);
270
271 #ifdef IPC_MESSAGE_LOG_ENABLED
272   if (logger->Enabled())
273     logger->OnPostDispatchMessage(message, channel_id_);
274 #endif
275 }
276
277 // Called on the listener's thread
278 void ChannelProxy::Context::OnDispatchConnected() {
279   if (channel_connected_called_)
280     return;
281
282   channel_connected_called_ = true;
283   if (listener_)
284     listener_->OnChannelConnected(peer_pid_);
285 }
286
287 // Called on the listener's thread
288 void ChannelProxy::Context::OnDispatchError() {
289   if (listener_)
290     listener_->OnChannelError();
291 }
292
293 //-----------------------------------------------------------------------------
294
295 ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
296                            Channel::Mode mode,
297                            Listener* listener,
298                            base::SingleThreadTaskRunner* ipc_task_runner)
299     : context_(new Context(listener, ipc_task_runner)),
300       did_init_(false) {
301   Init(channel_handle, mode, true);
302 }
303
304 ChannelProxy::ChannelProxy(Context* context)
305     : context_(context),
306       did_init_(false) {
307 }
308
309 ChannelProxy::~ChannelProxy() {
310   DCHECK(CalledOnValidThread());
311
312   Close();
313 }
314
315 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
316                         Channel::Mode mode,
317                         bool create_pipe_now) {
318   DCHECK(CalledOnValidThread());
319   DCHECK(!did_init_);
320 #if defined(OS_POSIX)
321   // When we are creating a server on POSIX, we need its file descriptor
322   // to be created immediately so that it can be accessed and passed
323   // to other processes. Forcing it to be created immediately avoids
324   // race conditions that may otherwise arise.
325   if (mode & Channel::MODE_SERVER_FLAG) {
326     create_pipe_now = true;
327   }
328 #endif  // defined(OS_POSIX)
329
330   if (create_pipe_now) {
331     // Create the channel immediately.  This effectively sets up the
332     // low-level pipe so that the client can connect.  Without creating
333     // the pipe immediately, it is possible for a listener to attempt
334     // to connect and get an error since the pipe doesn't exist yet.
335     context_->CreateChannel(channel_handle, mode);
336   } else {
337     context_->ipc_task_runner()->PostTask(
338         FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
339                               channel_handle, mode));
340   }
341
342   // complete initialization on the background thread
343   context_->ipc_task_runner()->PostTask(
344       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
345
346   did_init_ = true;
347 }
348
349 void ChannelProxy::Close() {
350   DCHECK(CalledOnValidThread());
351
352   // Clear the backpointer to the listener so that any pending calls to
353   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
354   // possible that the channel could be closed while it is receiving messages!
355   context_->Clear();
356
357   if (context_->ipc_task_runner()) {
358     context_->ipc_task_runner()->PostTask(
359         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
360   }
361 }
362
363 bool ChannelProxy::Send(Message* message) {
364   DCHECK(did_init_);
365
366   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
367   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
368
369 #ifdef IPC_MESSAGE_LOG_ENABLED
370   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
371 #endif
372
373   context_->ipc_task_runner()->PostTask(
374       FROM_HERE,
375       base::Bind(&ChannelProxy::Context::OnSendMessage,
376                  context_, base::Passed(scoped_ptr<Message>(message))));
377   return true;
378 }
379
380 void ChannelProxy::AddFilter(MessageFilter* filter) {
381   DCHECK(CalledOnValidThread());
382
383   context_->AddFilter(filter);
384 }
385
386 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
387   DCHECK(CalledOnValidThread());
388
389   context_->ipc_task_runner()->PostTask(
390       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
391                             make_scoped_refptr(filter)));
392 }
393
394 void ChannelProxy::ClearIPCTaskRunner() {
395   DCHECK(CalledOnValidThread());
396
397   context()->ClearIPCTaskRunner();
398 }
399
400 #if defined(OS_POSIX) && !defined(OS_NACL)
401 // See the TODO regarding lazy initialization of the channel in
402 // ChannelProxy::Init().
403 int ChannelProxy::GetClientFileDescriptor() {
404   DCHECK(CalledOnValidThread());
405
406   Channel* channel = context_.get()->channel_.get();
407   // Channel must have been created first.
408   DCHECK(channel) << context_.get()->channel_id_;
409   return channel->GetClientFileDescriptor();
410 }
411
412 int ChannelProxy::TakeClientFileDescriptor() {
413   DCHECK(CalledOnValidThread());
414
415   Channel* channel = context_.get()->channel_.get();
416   // Channel must have been created first.
417   DCHECK(channel) << context_.get()->channel_id_;
418   return channel->TakeClientFileDescriptor();
419 }
420
421 bool ChannelProxy::GetPeerEuid(uid_t* peer_euid) const {
422   DCHECK(CalledOnValidThread());
423
424   Channel* channel = context_.get()->channel_.get();
425   // Channel must have been created first.
426   DCHECK(channel) << context_.get()->channel_id_;
427   return channel->GetPeerEuid(peer_euid);
428 }
429 #endif
430
431 //-----------------------------------------------------------------------------
432
433 }  // namespace IPC