Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / mojo / system / channel.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include <algorithm>
8
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
17
18 namespace mojo {
19 namespace system {
20
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22                    MessageInTransit::kInvalidEndpointId,
23                kBootstrapEndpointId_is_invalid);
24
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26     Channel::kBootstrapEndpointId;
27
28 Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() {
29 }
30
31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
32                                     unsigned port)
33     : state(STATE_NORMAL), message_pipe(message_pipe), port(port) {
34 }
35
36 Channel::EndpointInfo::~EndpointInfo() {
37 }
38
39 Channel::Channel()
40     : is_running_(false),
41       is_shutting_down_(false),
42       next_local_id_(kBootstrapEndpointId) {
43 }
44
45 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
46   DCHECK(creation_thread_checker_.CalledOnValidThread());
47   DCHECK(raw_channel);
48
49   // No need to take |lock_|, since this must be called before this object
50   // becomes thread-safe.
51   DCHECK(!is_running_);
52   raw_channel_ = raw_channel.Pass();
53
54   if (!raw_channel_->Init(this)) {
55     raw_channel_.reset();
56     return false;
57   }
58
59   is_running_ = true;
60   return true;
61 }
62
63 void Channel::Shutdown() {
64   DCHECK(creation_thread_checker_.CalledOnValidThread());
65
66   IdToEndpointInfoMap to_destroy;
67   {
68     base::AutoLock locker(lock_);
69     if (!is_running_)
70       return;
71
72     // Note: Don't reset |raw_channel_|, in case we're being called from within
73     // |OnReadMessage()| or |OnError()|.
74     raw_channel_->Shutdown();
75     is_running_ = false;
76
77     // We need to deal with it outside the lock.
78     std::swap(to_destroy, local_id_to_endpoint_info_map_);
79   }
80
81   size_t num_live = 0;
82   size_t num_zombies = 0;
83   for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
84        it != to_destroy.end();
85        ++it) {
86     if (it->second.state == EndpointInfo::STATE_NORMAL) {
87       it->second.message_pipe->OnRemove(it->second.port);
88       num_live++;
89     } else {
90       DCHECK(!it->second.message_pipe);
91       num_zombies++;
92     }
93   }
94   DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
95                                        << " live endpoints and " << num_zombies
96                                        << " zombies";
97 }
98
99 void Channel::WillShutdownSoon() {
100   base::AutoLock locker(lock_);
101   is_shutting_down_ = true;
102 }
103
104 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
105     scoped_refptr<MessagePipe> message_pipe,
106     unsigned port) {
107   DCHECK(message_pipe);
108   DCHECK(port == 0 || port == 1);
109
110   MessageInTransit::EndpointId local_id;
111   {
112     base::AutoLock locker(lock_);
113
114     DLOG_IF(WARNING, is_shutting_down_)
115         << "AttachMessagePipeEndpoint() while shutting down";
116
117     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
118            local_id_to_endpoint_info_map_.find(next_local_id_) !=
119                local_id_to_endpoint_info_map_.end())
120       next_local_id_++;
121
122     local_id = next_local_id_;
123     next_local_id_++;
124
125     // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
126     // some expensive reference count increment/decrements.) Once this is done,
127     // we should be able to delete |EndpointInfo|'s default constructor.
128     local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
129   }
130
131   // This might fail if that port got an |OnPeerClose()| before attaching.
132   if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
133     return local_id;
134
135   // Note: If it failed, quite possibly the endpoint info was removed from that
136   // map (there's a race between us adding it to the map above and calling
137   // |Attach()|). And even if an entry exists for |local_id|, we need to check
138   // that it's the one we added (and not some other one that was added since).
139   {
140     base::AutoLock locker(lock_);
141     IdToEndpointInfoMap::iterator it =
142         local_id_to_endpoint_info_map_.find(local_id);
143     if (it != local_id_to_endpoint_info_map_.end() &&
144         it->second.message_pipe.get() == message_pipe.get() &&
145         it->second.port == port) {
146       DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
147       // TODO(vtl): FIXME -- This is wrong. We need to specify (to
148       // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
149       // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
150       // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
151       // run, then we'll get messages to an "invalid" local ID (for running, for
152       // removal).
153       local_id_to_endpoint_info_map_.erase(it);
154     }
155   }
156   return MessageInTransit::kInvalidEndpointId;
157 }
158
159 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
160                                      MessageInTransit::EndpointId remote_id) {
161   EndpointInfo endpoint_info;
162   {
163     base::AutoLock locker(lock_);
164
165     DLOG_IF(WARNING, is_shutting_down_)
166         << "RunMessagePipeEndpoint() while shutting down";
167
168     IdToEndpointInfoMap::const_iterator it =
169         local_id_to_endpoint_info_map_.find(local_id);
170     if (it == local_id_to_endpoint_info_map_.end())
171       return false;
172     endpoint_info = it->second;
173   }
174
175   // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
176   // and ignore it.
177   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
178     DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
179                 "(local ID " << local_id << ", remote ID " << remote_id << ")";
180     return true;
181   }
182
183   // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
184   // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
185   endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
186   return true;
187 }
188
189 void Channel::RunRemoteMessagePipeEndpoint(
190     MessageInTransit::EndpointId local_id,
191     MessageInTransit::EndpointId remote_id) {
192 #if DCHECK_IS_ON
193   {
194     base::AutoLock locker(lock_);
195     DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
196            local_id_to_endpoint_info_map_.end());
197   }
198 #endif
199
200   if (!SendControlMessage(
201           MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
202           local_id,
203           remote_id)) {
204     HandleLocalError(base::StringPrintf(
205         "Failed to send message to run remote message pipe endpoint (local ID "
206         "%u, remote ID %u)",
207         static_cast<unsigned>(local_id),
208         static_cast<unsigned>(remote_id)));
209   }
210 }
211
212 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
213   base::AutoLock locker(lock_);
214   if (!is_running_) {
215     // TODO(vtl): I think this is probably not an error condition, but I should
216     // think about it (and the shutdown sequence) more carefully.
217     LOG(WARNING) << "WriteMessage() after shutdown";
218     return false;
219   }
220
221   DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
222   return raw_channel_->WriteMessage(message.Pass());
223 }
224
225 bool Channel::IsWriteBufferEmpty() {
226   base::AutoLock locker(lock_);
227   if (!is_running_)
228     return true;
229   return raw_channel_->IsWriteBufferEmpty();
230 }
231
232 void Channel::DetachMessagePipeEndpoint(
233     MessageInTransit::EndpointId local_id,
234     MessageInTransit::EndpointId remote_id) {
235   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
236
237   bool should_send_remove_message = false;
238   {
239     base::AutoLock locker_(lock_);
240     if (!is_running_)
241       return;
242
243     IdToEndpointInfoMap::iterator it =
244         local_id_to_endpoint_info_map_.find(local_id);
245     DCHECK(it != local_id_to_endpoint_info_map_.end());
246
247     switch (it->second.state) {
248       case EndpointInfo::STATE_NORMAL:
249         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
250         it->second.message_pipe = NULL;
251         should_send_remove_message =
252             (remote_id != MessageInTransit::kInvalidEndpointId);
253         break;
254       case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
255         local_id_to_endpoint_info_map_.erase(it);
256         break;
257       case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
258         NOTREACHED();
259         break;
260       case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
261         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
262         break;
263     }
264   }
265   if (!should_send_remove_message)
266     return;
267
268   if (!SendControlMessage(
269           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
270           local_id,
271           remote_id)) {
272     HandleLocalError(base::StringPrintf(
273         "Failed to send message to remove remote message pipe endpoint (local "
274         "ID %u, remote ID %u)",
275         static_cast<unsigned>(local_id),
276         static_cast<unsigned>(remote_id)));
277   }
278 }
279
280 size_t Channel::GetSerializedPlatformHandleSize() const {
281   return raw_channel_->GetSerializedPlatformHandleSize();
282 }
283
284 Channel::~Channel() {
285   // The channel should have been shut down first.
286   DCHECK(!is_running_);
287 }
288
289 void Channel::OnReadMessage(
290     const MessageInTransit::View& message_view,
291     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
292   switch (message_view.type()) {
293     case MessageInTransit::kTypeMessagePipeEndpoint:
294     case MessageInTransit::kTypeMessagePipe:
295       OnReadMessageForDownstream(message_view, platform_handles.Pass());
296       break;
297     case MessageInTransit::kTypeChannel:
298       OnReadMessageForChannel(message_view, platform_handles.Pass());
299       break;
300     default:
301       HandleRemoteError(
302           base::StringPrintf("Received message of invalid type %u",
303                              static_cast<unsigned>(message_view.type())));
304       break;
305   }
306 }
307
308 void Channel::OnError(Error error) {
309   switch (error) {
310     case ERROR_READ_SHUTDOWN:
311       // The other side was cleanly closed, so this isn't actually an error.
312       DVLOG(1) << "RawChannel read error (shutdown)";
313       break;
314     case ERROR_READ_BROKEN: {
315       base::AutoLock locker(lock_);
316       LOG_IF(ERROR, !is_shutting_down_)
317           << "RawChannel read error (connection broken)";
318       break;
319     }
320     case ERROR_READ_BAD_MESSAGE:
321       // Receiving a bad message means either a bug, data corruption, or
322       // malicious attack (probably due to some other bug).
323       LOG(ERROR) << "RawChannel read error (received bad message)";
324       break;
325     case ERROR_READ_UNKNOWN:
326       LOG(ERROR) << "RawChannel read error (unknown)";
327       break;
328     case ERROR_WRITE:
329       // Write errors are slightly notable: they probably shouldn't happen under
330       // normal operation (but maybe the other side crashed).
331       LOG(WARNING) << "RawChannel write error";
332       break;
333   }
334   Shutdown();
335 }
336
337 void Channel::OnReadMessageForDownstream(
338     const MessageInTransit::View& message_view,
339     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
340   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
341          message_view.type() == MessageInTransit::kTypeMessagePipe);
342
343   MessageInTransit::EndpointId local_id = message_view.destination_id();
344   if (local_id == MessageInTransit::kInvalidEndpointId) {
345     HandleRemoteError("Received message with no destination ID");
346     return;
347   }
348
349   EndpointInfo endpoint_info;
350   {
351     base::AutoLock locker(lock_);
352
353     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
354     // be called from the creation thread, |raw_channel_| should never be null
355     // here.
356     DCHECK(is_running_);
357
358     IdToEndpointInfoMap::const_iterator it =
359         local_id_to_endpoint_info_map_.find(local_id);
360     if (it == local_id_to_endpoint_info_map_.end()) {
361       HandleRemoteError(base::StringPrintf(
362           "Received a message for nonexistent local destination ID %u",
363           static_cast<unsigned>(local_id)));
364       // This is strongly indicative of some problem. However, it's not a fatal
365       // error, since it may indicate a bug (or hostile) remote process. Don't
366       // die even for Debug builds, since handling this properly needs to be
367       // tested (TODO(vtl)).
368       DLOG(ERROR) << "This should not happen under normal operation.";
369       return;
370     }
371     endpoint_info = it->second;
372   }
373
374   // Ignore messages for zombie endpoints (not an error).
375   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
376     DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
377              << local_id << ", remote ID = " << message_view.source_id() << ")";
378     return;
379   }
380
381   // We need to duplicate the message (data), because |EnqueueMessage()| will
382   // take ownership of it.
383   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
384   if (message_view.transport_data_buffer_size() > 0) {
385     DCHECK(message_view.transport_data_buffer());
386     message->SetDispatchers(TransportData::DeserializeDispatchers(
387         message_view.transport_data_buffer(),
388         message_view.transport_data_buffer_size(),
389         platform_handles.Pass(),
390         this));
391   }
392   MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
393       MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
394   if (result != MOJO_RESULT_OK) {
395     // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
396     // has been closed (in an unavoidable race). This might also be a "remote"
397     // error, e.g., if the remote side is sending invalid control messages (to
398     // the message pipe).
399     HandleLocalError(base::StringPrintf(
400         "Failed to enqueue message to local ID %u (result %d)",
401         static_cast<unsigned>(local_id),
402         static_cast<int>(result)));
403     return;
404   }
405 }
406
407 void Channel::OnReadMessageForChannel(
408     const MessageInTransit::View& message_view,
409     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
410   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
411
412   // Currently, no channel messages take platform handles.
413   if (platform_handles) {
414     HandleRemoteError(
415         "Received invalid channel message (has platform handles)");
416     NOTREACHED();
417     return;
418   }
419
420   switch (message_view.subtype()) {
421     case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
422       DVLOG(2) << "Handling channel message to run message pipe (local ID "
423                << message_view.destination_id() << ", remote ID "
424                << message_view.source_id() << ")";
425       if (!RunMessagePipeEndpoint(message_view.destination_id(),
426                                   message_view.source_id())) {
427         HandleRemoteError(
428             "Received invalid channel message to run message pipe");
429       }
430       break;
431     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
432       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
433                << message_view.destination_id() << ", remote ID "
434                << message_view.source_id() << ")";
435       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
436                                      message_view.source_id())) {
437         HandleRemoteError(
438             "Received invalid channel message to remove message pipe");
439       }
440       break;
441     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
442       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
443                   "ID " << message_view.destination_id() << ", remote ID "
444                << message_view.source_id() << ")";
445       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
446                                      message_view.source_id())) {
447         HandleRemoteError(
448             "Received invalid channel message to ack remove message pipe");
449       }
450       break;
451     default:
452       HandleRemoteError("Received invalid channel message");
453       NOTREACHED();
454       break;
455   }
456 }
457
458 bool Channel::RemoveMessagePipeEndpoint(
459     MessageInTransit::EndpointId local_id,
460     MessageInTransit::EndpointId remote_id) {
461   EndpointInfo endpoint_info;
462   {
463     base::AutoLock locker(lock_);
464
465     IdToEndpointInfoMap::iterator it =
466         local_id_to_endpoint_info_map_.find(local_id);
467     if (it == local_id_to_endpoint_info_map_.end()) {
468       DVLOG(2) << "Remove message pipe error: not found";
469       return false;
470     }
471
472     // If it's waiting for the remove ack, just do it and return.
473     if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
474       local_id_to_endpoint_info_map_.erase(it);
475       return true;
476     }
477
478     if (it->second.state != EndpointInfo::STATE_NORMAL) {
479       DVLOG(2) << "Remove message pipe error: wrong state";
480       return false;
481     }
482
483     it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
484     endpoint_info = it->second;
485     it->second.message_pipe = NULL;
486   }
487
488   if (!SendControlMessage(
489           MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
490           local_id,
491           remote_id)) {
492     HandleLocalError(base::StringPrintf(
493         "Failed to send message to remove remote message pipe endpoint ack "
494         "(local ID %u, remote ID %u)",
495         static_cast<unsigned>(local_id),
496         static_cast<unsigned>(remote_id)));
497   }
498
499   endpoint_info.message_pipe->OnRemove(endpoint_info.port);
500
501   return true;
502 }
503
504 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
505                                  MessageInTransit::EndpointId local_id,
506                                  MessageInTransit::EndpointId remote_id) {
507   DVLOG(2) << "Sending channel control message: subtype " << subtype
508            << ", local ID " << local_id << ", remote ID " << remote_id;
509   scoped_ptr<MessageInTransit> message(
510       new MessageInTransit(MessageInTransit::kTypeChannel, subtype, 0, NULL));
511   message->set_source_id(local_id);
512   message->set_destination_id(remote_id);
513   return WriteMessage(message.Pass());
514 }
515
516 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
517   // TODO(vtl): Is this how we really want to handle this? Probably we want to
518   // terminate the connection, since it's spewing invalid stuff.
519   LOG(WARNING) << error_message;
520 }
521
522 void Channel::HandleLocalError(const base::StringPiece& error_message) {
523   // TODO(vtl): Is this how we really want to handle this?
524   // Sometimes we'll want to propagate the error back to the message pipe
525   // (endpoint), and notify it that the remote is (effectively) closed.
526   // Sometimes we'll want to kill the channel (and notify all the endpoints that
527   // their remotes are dead.
528   LOG(WARNING) << error_message;
529 }
530
531 }  // namespace system
532 }  // namespace mojo