2ea722df79b086f1cd1ba6f728a53052026bae07
[platform/framework/web/crosswalk.git] / src / ipc / ipc_channel_proxy.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_channel_proxy.h"
6
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_channel_factory.h"
15 #include "ipc/ipc_listener.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/message_filter.h"
19 #include "ipc/message_filter_router.h"
20
21 namespace IPC {
22
23 //------------------------------------------------------------------------------
24
25 ChannelProxy::Context::Context(Listener* listener,
26                                base::SingleThreadTaskRunner* ipc_task_runner)
27     : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
28       listener_(listener),
29       ipc_task_runner_(ipc_task_runner),
30       channel_connected_called_(false),
31       message_filter_router_(new MessageFilterRouter()),
32       peer_pid_(base::kNullProcessId) {
33   DCHECK(ipc_task_runner_.get());
34   // The Listener thread where Messages are handled must be a separate thread
35   // to avoid oversubscribing the IO thread. If you trigger this error, you
36   // need to either:
37   // 1) Create the ChannelProxy on a different thread, or
38   // 2) Just use Channel
39   // Note, we currently make an exception for a NULL listener. That usage
40   // basically works, but is outside the intent of ChannelProxy. This support
41   // will disappear, so please don't rely on it. See crbug.com/364241
42   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
43 }
44
45 ChannelProxy::Context::~Context() {
46 }
47
48 void ChannelProxy::Context::ClearIPCTaskRunner() {
49   ipc_task_runner_ = NULL;
50 }
51
52 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
53   DCHECK(!channel_);
54   channel_id_ = factory->GetName();
55   channel_ = factory->BuildChannel(this);
56 }
57
58 bool ChannelProxy::Context::TryFilters(const Message& message) {
59   DCHECK(message_filter_router_);
60 #ifdef IPC_MESSAGE_LOG_ENABLED
61   Logging* logger = Logging::GetInstance();
62   if (logger->Enabled())
63     logger->OnPreDispatchMessage(message);
64 #endif
65
66   if (message_filter_router_->TryFilters(message)) {
67     if (message.dispatch_error()) {
68       listener_task_runner_->PostTask(
69           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
70     }
71 #ifdef IPC_MESSAGE_LOG_ENABLED
72     if (logger->Enabled())
73       logger->OnPostDispatchMessage(message, channel_id_);
74 #endif
75     return true;
76   }
77   return false;
78 }
79
80 // Called on the IPC::Channel thread
81 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
82   // First give a chance to the filters to process this message.
83   if (!TryFilters(message))
84     OnMessageReceivedNoFilter(message);
85   return true;
86 }
87
88 // Called on the IPC::Channel thread
89 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
90   listener_task_runner_->PostTask(
91       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
92   return true;
93 }
94
95 // Called on the IPC::Channel thread
96 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
97   // We cache off the peer_pid so it can be safely accessed from both threads.
98   peer_pid_ = channel_->GetPeerPID();
99
100   // Add any pending filters.  This avoids a race condition where someone
101   // creates a ChannelProxy, calls AddFilter, and then right after starts the
102   // peer process.  The IO thread could receive a message before the task to add
103   // the filter is run on the IO thread.
104   OnAddFilter();
105
106   // See above comment about using listener_task_runner_ here.
107   listener_task_runner_->PostTask(
108       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
109 }
110
111 // Called on the IPC::Channel thread
112 void ChannelProxy::Context::OnChannelError() {
113   for (size_t i = 0; i < filters_.size(); ++i)
114     filters_[i]->OnChannelError();
115
116   // See above comment about using listener_task_runner_ here.
117   listener_task_runner_->PostTask(
118       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
119 }
120
121 // Called on the IPC::Channel thread
122 void ChannelProxy::Context::OnChannelOpened() {
123   DCHECK(channel_ != NULL);
124
125   // Assume a reference to ourselves on behalf of this thread.  This reference
126   // will be released when we are closed.
127   AddRef();
128
129   if (!channel_->Connect()) {
130     OnChannelError();
131     return;
132   }
133
134   for (size_t i = 0; i < filters_.size(); ++i)
135     filters_[i]->OnFilterAdded(channel_.get());
136 }
137
138 // Called on the IPC::Channel thread
139 void ChannelProxy::Context::OnChannelClosed() {
140   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
141   // would result in this branch being taken.
142   if (!channel_)
143     return;
144
145   for (size_t i = 0; i < filters_.size(); ++i) {
146     filters_[i]->OnChannelClosing();
147     filters_[i]->OnFilterRemoved();
148   }
149
150   // We don't need the filters anymore.
151   message_filter_router_->Clear();
152   filters_.clear();
153   // We don't need the lock, because at this point, the listener thread can't
154   // access it any more.
155   pending_filters_.clear();
156
157   channel_.reset();
158
159   // Balance with the reference taken during startup.  This may result in
160   // self-destruction.
161   Release();
162 }
163
164 void ChannelProxy::Context::Clear() {
165   listener_ = NULL;
166 }
167
168 // Called on the IPC::Channel thread
169 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
170   if (!channel_) {
171     OnChannelClosed();
172     return;
173   }
174
175   if (!channel_->Send(message.release()))
176     OnChannelError();
177 }
178
179 // Called on the IPC::Channel thread
180 void ChannelProxy::Context::OnAddFilter() {
181   // Our OnChannelConnected method has not yet been called, so we can't be
182   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
183   // it invokes OnAddFilter, so any pending filter(s) will be added at that
184   // time.
185   if (peer_pid_ == base::kNullProcessId)
186     return;
187
188   std::vector<scoped_refptr<MessageFilter> > new_filters;
189   {
190     base::AutoLock auto_lock(pending_filters_lock_);
191     new_filters.swap(pending_filters_);
192   }
193
194   for (size_t i = 0; i < new_filters.size(); ++i) {
195     filters_.push_back(new_filters[i]);
196
197     message_filter_router_->AddFilter(new_filters[i].get());
198
199     // The channel has already been created and connected, so we need to
200     // inform the filters right now.
201     new_filters[i]->OnFilterAdded(channel_.get());
202     new_filters[i]->OnChannelConnected(peer_pid_);
203   }
204 }
205
206 // Called on the IPC::Channel thread
207 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
208   if (peer_pid_ == base::kNullProcessId) {
209     // The channel is not yet connected, so any filters are still pending.
210     base::AutoLock auto_lock(pending_filters_lock_);
211     for (size_t i = 0; i < pending_filters_.size(); ++i) {
212       if (pending_filters_[i].get() == filter) {
213         filter->OnFilterRemoved();
214         pending_filters_.erase(pending_filters_.begin() + i);
215         return;
216       }
217     }
218     return;
219   }
220   if (!channel_)
221     return;  // The filters have already been deleted.
222
223   message_filter_router_->RemoveFilter(filter);
224
225   for (size_t i = 0; i < filters_.size(); ++i) {
226     if (filters_[i].get() == filter) {
227       filter->OnFilterRemoved();
228       filters_.erase(filters_.begin() + i);
229       return;
230     }
231   }
232
233   NOTREACHED() << "filter to be removed not found";
234 }
235
236 // Called on the listener's thread
237 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
238   base::AutoLock auto_lock(pending_filters_lock_);
239   pending_filters_.push_back(make_scoped_refptr(filter));
240   ipc_task_runner_->PostTask(
241       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
242 }
243
244 // Called on the listener's thread
245 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
246 #ifdef IPC_MESSAGE_LOG_ENABLED
247   Logging* logger = Logging::GetInstance();
248   std::string name;
249   logger->GetMessageText(message.type(), &name, &message, NULL);
250   TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
251                "name", name);
252 #else
253   TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
254                "class", IPC_MESSAGE_ID_CLASS(message.type()),
255                "line", IPC_MESSAGE_ID_LINE(message.type()));
256 #endif
257
258   if (!listener_)
259     return;
260
261   OnDispatchConnected();
262
263 #ifdef IPC_MESSAGE_LOG_ENABLED
264   if (message.type() == IPC_LOGGING_ID) {
265     logger->OnReceivedLoggingMessage(message);
266     return;
267   }
268
269   if (logger->Enabled())
270     logger->OnPreDispatchMessage(message);
271 #endif
272
273   listener_->OnMessageReceived(message);
274   if (message.dispatch_error())
275     listener_->OnBadMessageReceived(message);
276
277 #ifdef IPC_MESSAGE_LOG_ENABLED
278   if (logger->Enabled())
279     logger->OnPostDispatchMessage(message, channel_id_);
280 #endif
281 }
282
283 // Called on the listener's thread
284 void ChannelProxy::Context::OnDispatchConnected() {
285   if (channel_connected_called_)
286     return;
287
288   channel_connected_called_ = true;
289   if (listener_)
290     listener_->OnChannelConnected(peer_pid_);
291 }
292
293 // Called on the listener's thread
294 void ChannelProxy::Context::OnDispatchError() {
295   if (listener_)
296     listener_->OnChannelError();
297 }
298
299 // Called on the listener's thread
300 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
301   if (listener_)
302     listener_->OnBadMessageReceived(message);
303 }
304
305 //-----------------------------------------------------------------------------
306
307 // static
308 scoped_ptr<ChannelProxy> ChannelProxy::Create(
309     const IPC::ChannelHandle& channel_handle,
310     Channel::Mode mode,
311     Listener* listener,
312     base::SingleThreadTaskRunner* ipc_task_runner) {
313   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
314   channel->Init(channel_handle, mode, true);
315   return channel.Pass();
316 }
317
318 // static
319 scoped_ptr<ChannelProxy> ChannelProxy::Create(
320     scoped_ptr<ChannelFactory> factory,
321     Listener* listener,
322     base::SingleThreadTaskRunner* ipc_task_runner) {
323   scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
324   channel->Init(factory.Pass(), true);
325   return channel.Pass();
326 }
327
328 ChannelProxy::ChannelProxy(Context* context)
329     : context_(context),
330       did_init_(false) {
331 }
332
333 ChannelProxy::ChannelProxy(Listener* listener,
334                            base::SingleThreadTaskRunner* ipc_task_runner)
335     : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
336 }
337
338 ChannelProxy::~ChannelProxy() {
339   DCHECK(CalledOnValidThread());
340
341   Close();
342 }
343
344 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
345                         Channel::Mode mode,
346                         bool create_pipe_now) {
347 #if defined(OS_POSIX)
348   // When we are creating a server on POSIX, we need its file descriptor
349   // to be created immediately so that it can be accessed and passed
350   // to other processes. Forcing it to be created immediately avoids
351   // race conditions that may otherwise arise.
352   if (mode & Channel::MODE_SERVER_FLAG) {
353     create_pipe_now = true;
354   }
355 #endif  // defined(OS_POSIX)
356   Init(ChannelFactory::Create(channel_handle, mode),
357        create_pipe_now);
358 }
359
360 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
361                         bool create_pipe_now) {
362   DCHECK(CalledOnValidThread());
363   DCHECK(!did_init_);
364
365   if (create_pipe_now) {
366     // Create the channel immediately.  This effectively sets up the
367     // low-level pipe so that the client can connect.  Without creating
368     // the pipe immediately, it is possible for a listener to attempt
369     // to connect and get an error since the pipe doesn't exist yet.
370     context_->CreateChannel(factory.Pass());
371   } else {
372     context_->ipc_task_runner()->PostTask(
373         FROM_HERE, base::Bind(&Context::CreateChannel,
374                               context_.get(), Passed(factory.Pass())));
375   }
376
377   // complete initialization on the background thread
378   context_->ipc_task_runner()->PostTask(
379       FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
380
381   did_init_ = true;
382 }
383
384 void ChannelProxy::Close() {
385   DCHECK(CalledOnValidThread());
386
387   // Clear the backpointer to the listener so that any pending calls to
388   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
389   // possible that the channel could be closed while it is receiving messages!
390   context_->Clear();
391
392   if (context_->ipc_task_runner()) {
393     context_->ipc_task_runner()->PostTask(
394         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
395   }
396 }
397
398 bool ChannelProxy::Send(Message* message) {
399   DCHECK(did_init_);
400
401   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
402   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
403
404 #ifdef IPC_MESSAGE_LOG_ENABLED
405   Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
406 #endif
407
408   context_->ipc_task_runner()->PostTask(
409       FROM_HERE,
410       base::Bind(&ChannelProxy::Context::OnSendMessage,
411                  context_, base::Passed(scoped_ptr<Message>(message))));
412   return true;
413 }
414
415 void ChannelProxy::AddFilter(MessageFilter* filter) {
416   DCHECK(CalledOnValidThread());
417
418   context_->AddFilter(filter);
419 }
420
421 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
422   DCHECK(CalledOnValidThread());
423
424   context_->ipc_task_runner()->PostTask(
425       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
426                             make_scoped_refptr(filter)));
427 }
428
429 void ChannelProxy::ClearIPCTaskRunner() {
430   DCHECK(CalledOnValidThread());
431
432   context()->ClearIPCTaskRunner();
433 }
434
435 #if defined(OS_POSIX) && !defined(OS_NACL)
436 // See the TODO regarding lazy initialization of the channel in
437 // ChannelProxy::Init().
438 int ChannelProxy::GetClientFileDescriptor() {
439   DCHECK(CalledOnValidThread());
440
441   Channel* channel = context_.get()->channel_.get();
442   // Channel must have been created first.
443   DCHECK(channel) << context_.get()->channel_id_;
444   return channel->GetClientFileDescriptor();
445 }
446
447 int ChannelProxy::TakeClientFileDescriptor() {
448   DCHECK(CalledOnValidThread());
449
450   Channel* channel = context_.get()->channel_.get();
451   // Channel must have been created first.
452   DCHECK(channel) << context_.get()->channel_id_;
453   return channel->TakeClientFileDescriptor();
454 }
455 #endif
456
457 //-----------------------------------------------------------------------------
458
459 }  // namespace IPC