Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / mojo / edk / system / channel.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/channel.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/edk/embedder/platform_handle_vector.h"
15 #include "mojo/edk/system/transport_data.h"
16
17 namespace mojo {
18 namespace system {
19
20 Channel::Channel(embedder::PlatformSupport* platform_support)
21     : platform_support_(platform_support),
22       is_running_(false),
23       is_shutting_down_(false) {
24 }
25
26 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
27   DCHECK(creation_thread_checker_.CalledOnValidThread());
28   DCHECK(raw_channel);
29
30   // No need to take |lock_|, since this must be called before this object
31   // becomes thread-safe.
32   DCHECK(!is_running_);
33   raw_channel_ = raw_channel.Pass();
34
35   if (!raw_channel_->Init(this)) {
36     raw_channel_.reset();
37     return false;
38   }
39
40   is_running_ = true;
41   return true;
42 }
43
44 void Channel::Shutdown() {
45   DCHECK(creation_thread_checker_.CalledOnValidThread());
46
47   IdToEndpointMap to_destroy;
48   {
49     base::AutoLock locker(lock_);
50     if (!is_running_)
51       return;
52
53     // Note: Don't reset |raw_channel_|, in case we're being called from within
54     // |OnReadMessage()| or |OnError()|.
55     raw_channel_->Shutdown();
56     is_running_ = false;
57
58     // We need to deal with it outside the lock.
59     std::swap(to_destroy, local_id_to_endpoint_map_);
60   }
61
62   size_t num_live = 0;
63   size_t num_zombies = 0;
64   for (IdToEndpointMap::iterator it = to_destroy.begin();
65        it != to_destroy.end();
66        ++it) {
67     if (it->second.get()) {
68       num_live++;
69       it->second->OnDisconnect();
70       it->second->DetachFromChannel();
71     } else {
72       num_zombies++;
73     }
74   }
75   DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
76                                        << " live endpoints and " << num_zombies
77                                        << " zombies";
78 }
79
80 void Channel::WillShutdownSoon() {
81   base::AutoLock locker(lock_);
82   is_shutting_down_ = true;
83 }
84
85 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
86 // keeps the endpoint alive even after the lock is released. Otherwise, there's
87 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
88 // directly to this function, which wouldn't be sufficient for safety.
89 ChannelEndpointId Channel::AttachAndRunEndpoint(
90     scoped_refptr<ChannelEndpoint> endpoint,
91     bool is_bootstrap) {
92   DCHECK(endpoint.get());
93
94   ChannelEndpointId local_id;
95   ChannelEndpointId remote_id;
96   {
97     base::AutoLock locker(lock_);
98
99     DLOG_IF(WARNING, is_shutting_down_)
100         << "AttachEndpoint() while shutting down";
101
102     if (is_bootstrap) {
103       local_id = ChannelEndpointId::GetBootstrap();
104       DCHECK(local_id_to_endpoint_map_.find(local_id) ==
105              local_id_to_endpoint_map_.end());
106
107       remote_id = ChannelEndpointId::GetBootstrap();
108     } else {
109       do {
110         local_id = local_id_generator_.GetNext();
111       } while (local_id_to_endpoint_map_.find(local_id) !=
112                local_id_to_endpoint_map_.end());
113
114       // TODO(vtl): We also need to check for collisions of remote IDs here.
115       remote_id = remote_id_generator_.GetNext();
116     }
117
118     local_id_to_endpoint_map_[local_id] = endpoint;
119   }
120
121   if (!is_bootstrap) {
122     if (!SendControlMessage(
123             MessageInTransit::kSubtypeChannelAttachAndRunEndpoint,
124             local_id,
125             remote_id)) {
126       HandleLocalError(base::StringPrintf(
127           "Failed to send message to run remote message pipe endpoint (local "
128           "ID %u, remote ID %u)",
129           static_cast<unsigned>(local_id.value()),
130           static_cast<unsigned>(remote_id.value())));
131       // TODO(vtl): Should we continue on to |AttachAndRun()|?
132     }
133   }
134
135   endpoint->AttachAndRun(this, local_id, remote_id);
136   return remote_id;
137 }
138
139 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
140   base::AutoLock locker(lock_);
141   if (!is_running_) {
142     // TODO(vtl): I think this is probably not an error condition, but I should
143     // think about it (and the shutdown sequence) more carefully.
144     LOG(WARNING) << "WriteMessage() after shutdown";
145     return false;
146   }
147
148   DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
149   return raw_channel_->WriteMessage(message.Pass());
150 }
151
152 bool Channel::IsWriteBufferEmpty() {
153   base::AutoLock locker(lock_);
154   if (!is_running_)
155     return true;
156   return raw_channel_->IsWriteBufferEmpty();
157 }
158
159 void Channel::DetachEndpoint(ChannelEndpoint* endpoint,
160                              ChannelEndpointId local_id,
161                              ChannelEndpointId remote_id) {
162   DCHECK(endpoint);
163   DCHECK(local_id.is_valid());
164
165   if (!remote_id.is_valid())
166     return;  // Nothing to do.
167
168   {
169     base::AutoLock locker_(lock_);
170     if (!is_running_)
171       return;
172
173     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
174     // We detach immediately if we receive a remove message, so it's possible
175     // that the local ID is no longer in |local_id_to_endpoint_map_|, or even
176     // that it's since been reused for another endpoint. In both cases, there's
177     // nothing more to do.
178     if (it == local_id_to_endpoint_map_.end() || it->second.get() != endpoint)
179       return;
180
181     DCHECK(it->second.get());
182     it->second = nullptr;
183
184     // Send a remove message outside the lock.
185   }
186
187   if (!SendControlMessage(
188           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
189           local_id,
190           remote_id)) {
191     HandleLocalError(base::StringPrintf(
192         "Failed to send message to remove remote message pipe endpoint (local "
193         "ID %u, remote ID %u)",
194         static_cast<unsigned>(local_id.value()),
195         static_cast<unsigned>(remote_id.value())));
196   }
197 }
198
199 scoped_refptr<MessagePipe> Channel::PassIncomingMessagePipe(
200     ChannelEndpointId local_id) {
201   // No need to check the validity of |local_id| -- if it's not valid, it simply
202   // won't be in |incoming_message_pipes_|.
203   DVLOG_IF(2, !local_id.is_valid() || !local_id.is_remote())
204       << "Attempt to get invalid incoming message pipe for ID " << local_id;
205
206   base::AutoLock locker(lock_);
207
208   auto it = incoming_message_pipes_.find(local_id);
209   if (it == incoming_message_pipes_.end())
210     return scoped_refptr<MessagePipe>();
211
212   scoped_refptr<MessagePipe> rv;
213   rv.swap(it->second);
214   incoming_message_pipes_.erase(it);
215   return rv;
216 }
217
218 size_t Channel::GetSerializedPlatformHandleSize() const {
219   return raw_channel_->GetSerializedPlatformHandleSize();
220 }
221
222 Channel::~Channel() {
223   // The channel should have been shut down first.
224   DCHECK(!is_running_);
225 }
226
227 void Channel::OnReadMessage(
228     const MessageInTransit::View& message_view,
229     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
230   DCHECK(creation_thread_checker_.CalledOnValidThread());
231
232   switch (message_view.type()) {
233     case MessageInTransit::kTypeMessagePipeEndpoint:
234     case MessageInTransit::kTypeMessagePipe:
235       OnReadMessageForDownstream(message_view, platform_handles.Pass());
236       break;
237     case MessageInTransit::kTypeChannel:
238       OnReadMessageForChannel(message_view, platform_handles.Pass());
239       break;
240     default:
241       HandleRemoteError(
242           base::StringPrintf("Received message of invalid type %u",
243                              static_cast<unsigned>(message_view.type())));
244       break;
245   }
246 }
247
248 void Channel::OnError(Error error) {
249   DCHECK(creation_thread_checker_.CalledOnValidThread());
250
251   switch (error) {
252     case ERROR_READ_SHUTDOWN:
253       // The other side was cleanly closed, so this isn't actually an error.
254       DVLOG(1) << "RawChannel read error (shutdown)";
255       break;
256     case ERROR_READ_BROKEN: {
257       base::AutoLock locker(lock_);
258       LOG_IF(ERROR, !is_shutting_down_)
259           << "RawChannel read error (connection broken)";
260       break;
261     }
262     case ERROR_READ_BAD_MESSAGE:
263       // Receiving a bad message means either a bug, data corruption, or
264       // malicious attack (probably due to some other bug).
265       LOG(ERROR) << "RawChannel read error (received bad message)";
266       break;
267     case ERROR_READ_UNKNOWN:
268       LOG(ERROR) << "RawChannel read error (unknown)";
269       break;
270     case ERROR_WRITE:
271       // Write errors are slightly notable: they probably shouldn't happen under
272       // normal operation (but maybe the other side crashed).
273       LOG(WARNING) << "RawChannel write error";
274       break;
275   }
276   Shutdown();
277 }
278
279 void Channel::OnReadMessageForDownstream(
280     const MessageInTransit::View& message_view,
281     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
282   DCHECK(creation_thread_checker_.CalledOnValidThread());
283   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
284          message_view.type() == MessageInTransit::kTypeMessagePipe);
285
286   ChannelEndpointId local_id = message_view.destination_id();
287   if (!local_id.is_valid()) {
288     HandleRemoteError("Received message with no destination ID");
289     return;
290   }
291
292   scoped_refptr<ChannelEndpoint> endpoint;
293   {
294     base::AutoLock locker(lock_);
295
296     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
297     // be called from the creation thread, |raw_channel_| should never be null
298     // here.
299     DCHECK(is_running_);
300
301     IdToEndpointMap::const_iterator it =
302         local_id_to_endpoint_map_.find(local_id);
303     if (it != local_id_to_endpoint_map_.end()) {
304       // Ignore messages for zombie endpoints (not an error).
305       if (!it->second.get()) {
306         DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID "
307                     "= " << local_id
308                  << ", remote ID = " << message_view.source_id() << ")";
309         return;
310       }
311
312       endpoint = it->second;
313     }
314   }
315   if (!endpoint.get()) {
316     HandleRemoteError(base::StringPrintf(
317         "Received a message for nonexistent local destination ID %u",
318         static_cast<unsigned>(local_id.value())));
319     // This is strongly indicative of some problem. However, it's not a fatal
320     // error, since it may indicate a buggy (or hostile) remote process. Don't
321     // die even for Debug builds, since handling this properly needs to be
322     // tested (TODO(vtl)).
323     DLOG(ERROR) << "This should not happen under normal operation.";
324     return;
325   }
326
327   if (!endpoint->OnReadMessage(message_view, platform_handles.Pass())) {
328     HandleLocalError(
329         base::StringPrintf("Failed to enqueue message to local ID %u",
330                            static_cast<unsigned>(local_id.value())));
331     return;
332   }
333 }
334
335 void Channel::OnReadMessageForChannel(
336     const MessageInTransit::View& message_view,
337     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
338   DCHECK(creation_thread_checker_.CalledOnValidThread());
339   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
340
341   // Currently, no channel messages take platform handles.
342   if (platform_handles) {
343     HandleRemoteError(
344         "Received invalid channel message (has platform handles)");
345     NOTREACHED();
346     return;
347   }
348
349   switch (message_view.subtype()) {
350     case MessageInTransit::kSubtypeChannelAttachAndRunEndpoint:
351       DVLOG(2) << "Handling channel message to attach and run message pipe "
352                   "(local ID " << message_view.destination_id()
353                << ", remote ID " << message_view.source_id() << ")";
354       if (!OnAttachAndRunEndpoint(message_view.destination_id(),
355                                   message_view.source_id())) {
356         HandleRemoteError(
357             "Received invalid channel message to attach and run message pipe");
358       }
359       break;
360     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
361       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
362                << message_view.destination_id() << ", remote ID "
363                << message_view.source_id() << ")";
364       if (!OnRemoveMessagePipeEndpoint(message_view.destination_id(),
365                                        message_view.source_id())) {
366         HandleRemoteError(
367             "Received invalid channel message to remove message pipe");
368       }
369       break;
370     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
371       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
372                   "ID " << message_view.destination_id() << ", remote ID "
373                << message_view.source_id() << ")";
374       if (!OnRemoveMessagePipeEndpointAck(message_view.destination_id())) {
375         HandleRemoteError(
376             "Received invalid channel message to ack remove message pipe");
377       }
378       break;
379     default:
380       HandleRemoteError("Received invalid channel message");
381       NOTREACHED();
382       break;
383   }
384 }
385
386 bool Channel::OnAttachAndRunEndpoint(ChannelEndpointId local_id,
387                                      ChannelEndpointId remote_id) {
388   // We should only get this for remotely-created local endpoints, so our local
389   // ID should be "remote".
390   if (!local_id.is_valid() || !local_id.is_remote()) {
391     DVLOG(2) << "Received attach and run endpoint with invalid local ID";
392     return false;
393   }
394
395   // Conversely, the remote end should be "local".
396   if (!remote_id.is_valid() || remote_id.is_remote()) {
397     DVLOG(2) << "Received attach and run endpoint with invalid remote ID";
398     return false;
399   }
400
401   // Create a message pipe and thus an endpoint (outside the lock).
402   scoped_refptr<ChannelEndpoint> endpoint;
403   scoped_refptr<MessagePipe> message_pipe(
404       MessagePipe::CreateLocalProxy(&endpoint));
405
406   bool success = true;
407   {
408     base::AutoLock locker(lock_);
409
410     if (local_id_to_endpoint_map_.find(local_id) ==
411         local_id_to_endpoint_map_.end()) {
412       DCHECK(incoming_message_pipes_.find(local_id) ==
413              incoming_message_pipes_.end());
414
415       // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll
416       // avoid some refcount churn.)
417       local_id_to_endpoint_map_[local_id] = endpoint;
418       incoming_message_pipes_[local_id] = message_pipe;
419     } else {
420       // We need to call |Close()| on the message pipe outside the lock.
421       success = false;
422     }
423   }
424   if (!success) {
425     DVLOG(2) << "Received attach and run endpoint for existing local ID";
426     message_pipe->Close(0);
427     return false;
428   }
429
430   endpoint->AttachAndRun(this, local_id, remote_id);
431   return true;
432 }
433
434 bool Channel::OnRemoveMessagePipeEndpoint(ChannelEndpointId local_id,
435                                           ChannelEndpointId remote_id) {
436   DCHECK(creation_thread_checker_.CalledOnValidThread());
437
438   scoped_refptr<ChannelEndpoint> endpoint;
439   {
440     base::AutoLock locker(lock_);
441
442     IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
443     if (it == local_id_to_endpoint_map_.end()) {
444       DVLOG(2) << "Remove message pipe endpoint error: not found";
445       return false;
446     }
447
448     if (!it->second.get()) {
449       // Remove messages "crossed"; we have to wait for the ack.
450       return true;
451     }
452
453     endpoint = it->second;
454     local_id_to_endpoint_map_.erase(it);
455     // Detach and send the remove ack message outside the lock.
456   }
457
458   endpoint->DetachFromChannel();
459
460   if (!SendControlMessage(
461           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
462           local_id,
463           remote_id)) {
464     HandleLocalError(base::StringPrintf(
465         "Failed to send message to remove remote message pipe endpoint ack "
466         "(local ID %u, remote ID %u)",
467         static_cast<unsigned>(local_id.value()),
468         static_cast<unsigned>(remote_id.value())));
469   }
470
471   endpoint->OnDisconnect();
472   return true;
473 }
474
475 bool Channel::OnRemoveMessagePipeEndpointAck(ChannelEndpointId local_id) {
476   DCHECK(creation_thread_checker_.CalledOnValidThread());
477
478   base::AutoLock locker(lock_);
479
480   IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
481   if (it == local_id_to_endpoint_map_.end()) {
482     DVLOG(2) << "Remove message pipe endpoint ack error: not found";
483     return false;
484   }
485
486   if (it->second.get()) {
487     DVLOG(2) << "Remove message pipe endpoint ack error: wrong state";
488     return false;
489   }
490
491   local_id_to_endpoint_map_.erase(it);
492   return true;
493 }
494
495 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
496                                  ChannelEndpointId local_id,
497                                  ChannelEndpointId remote_id) {
498   DVLOG(2) << "Sending channel control message: subtype " << subtype
499            << ", local ID " << local_id << ", remote ID " << remote_id;
500   scoped_ptr<MessageInTransit> message(new MessageInTransit(
501       MessageInTransit::kTypeChannel, subtype, 0, nullptr));
502   message->set_source_id(local_id);
503   message->set_destination_id(remote_id);
504   return WriteMessage(message.Pass());
505 }
506
507 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
508   // TODO(vtl): Is this how we really want to handle this? Probably we want to
509   // terminate the connection, since it's spewing invalid stuff.
510   LOG(WARNING) << error_message;
511 }
512
513 void Channel::HandleLocalError(const base::StringPiece& error_message) {
514   // TODO(vtl): Is this how we really want to handle this?
515   // Sometimes we'll want to propagate the error back to the message pipe
516   // (endpoint), and notify it that the remote is (effectively) closed.
517   // Sometimes we'll want to kill the channel (and notify all the endpoints that
518   // their remotes are dead.
519   LOG(WARNING) << error_message;
520 }
521
522 }  // namespace system
523 }  // namespace mojo