1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/channel.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
21 static_assert(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId,
23 "kBootstrapEndpointId is invalid");
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId;
28 Channel::Channel(embedder::PlatformSupport* platform_support)
29 : platform_support_(platform_support),
31 is_shutting_down_(false),
32 next_local_id_(kBootstrapEndpointId) {
35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
36 DCHECK(creation_thread_checker_.CalledOnValidThread());
39 // No need to take |lock_|, since this must be called before this object
40 // becomes thread-safe.
42 raw_channel_ = raw_channel.Pass();
44 if (!raw_channel_->Init(this)) {
53 void Channel::Shutdown() {
54 DCHECK(creation_thread_checker_.CalledOnValidThread());
56 IdToEndpointMap to_destroy;
58 base::AutoLock locker(lock_);
62 // Note: Don't reset |raw_channel_|, in case we're being called from within
63 // |OnReadMessage()| or |OnError()|.
64 raw_channel_->Shutdown();
67 // We need to deal with it outside the lock.
68 std::swap(to_destroy, local_id_to_endpoint_map_);
72 size_t num_zombies = 0;
73 for (IdToEndpointMap::iterator it = to_destroy.begin();
74 it != to_destroy.end();
76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
77 it->second->message_pipe_->OnRemove(it->second->port_);
80 DCHECK(!it->second->message_pipe_.get());
83 it->second->DetachFromChannel();
85 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
86 << " live endpoints and " << num_zombies
90 void Channel::WillShutdownSoon() {
91 base::AutoLock locker(lock_);
92 is_shutting_down_ = true;
95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
98 // directly to this function, which wouldn't be sufficient for safety.
99 MessageInTransit::EndpointId Channel::AttachEndpoint(
100 scoped_refptr<ChannelEndpoint> endpoint) {
101 DCHECK(endpoint.get());
103 MessageInTransit::EndpointId local_id;
105 base::AutoLock locker(lock_);
107 DLOG_IF(WARNING, is_shutting_down_)
108 << "AttachEndpoint() while shutting down";
110 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
111 local_id_to_endpoint_map_.find(next_local_id_) !=
112 local_id_to_endpoint_map_.end())
115 local_id = next_local_id_;
117 local_id_to_endpoint_map_[local_id] = endpoint;
120 endpoint->AttachToChannel(this, local_id);
124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
125 MessageInTransit::EndpointId remote_id) {
126 scoped_refptr<ChannelEndpoint> endpoint;
127 ChannelEndpoint::State state;
129 base::AutoLock locker(lock_);
131 DLOG_IF(WARNING, is_shutting_down_)
132 << "RunMessagePipeEndpoint() while shutting down";
134 IdToEndpointMap::const_iterator it =
135 local_id_to_endpoint_map_.find(local_id);
136 if (it == local_id_to_endpoint_map_.end())
138 endpoint = it->second;
139 state = it->second->state_;
142 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
144 if (state != ChannelEndpoint::STATE_NORMAL) {
145 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
146 "(local ID " << local_id << ", remote ID " << remote_id << ")";
150 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
151 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
152 endpoint->Run(remote_id);
156 void Channel::RunRemoteMessagePipeEndpoint(
157 MessageInTransit::EndpointId local_id,
158 MessageInTransit::EndpointId remote_id) {
161 base::AutoLock locker(lock_);
162 DCHECK(local_id_to_endpoint_map_.find(local_id) !=
163 local_id_to_endpoint_map_.end());
167 if (!SendControlMessage(
168 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
171 HandleLocalError(base::StringPrintf(
172 "Failed to send message to run remote message pipe endpoint (local ID "
174 static_cast<unsigned>(local_id),
175 static_cast<unsigned>(remote_id)));
179 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
180 base::AutoLock locker(lock_);
182 // TODO(vtl): I think this is probably not an error condition, but I should
183 // think about it (and the shutdown sequence) more carefully.
184 LOG(WARNING) << "WriteMessage() after shutdown";
188 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
189 return raw_channel_->WriteMessage(message.Pass());
192 bool Channel::IsWriteBufferEmpty() {
193 base::AutoLock locker(lock_);
196 return raw_channel_->IsWriteBufferEmpty();
199 void Channel::DetachMessagePipeEndpoint(
200 MessageInTransit::EndpointId local_id,
201 MessageInTransit::EndpointId remote_id) {
202 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
204 // If this is non-null after the locked block, the endpoint should be detached
205 // (and no remove message sent).
206 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
208 base::AutoLock locker_(lock_);
212 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
213 DCHECK(it != local_id_to_endpoint_map_.end());
215 switch (it->second->state_) {
216 case ChannelEndpoint::STATE_NORMAL:
217 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
218 it->second->message_pipe_ = nullptr;
219 if (remote_id == MessageInTransit::kInvalidEndpointId)
221 // We have to send a remove message (outside the lock).
223 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
224 endpoint_to_detach = it->second;
225 local_id_to_endpoint_map_.erase(it);
226 // We have to detach (outside the lock).
228 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
233 if (endpoint_to_detach.get()) {
234 endpoint_to_detach->DetachFromChannel();
238 if (!SendControlMessage(
239 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
242 HandleLocalError(base::StringPrintf(
243 "Failed to send message to remove remote message pipe endpoint (local "
244 "ID %u, remote ID %u)",
245 static_cast<unsigned>(local_id),
246 static_cast<unsigned>(remote_id)));
250 size_t Channel::GetSerializedPlatformHandleSize() const {
251 return raw_channel_->GetSerializedPlatformHandleSize();
254 Channel::~Channel() {
255 // The channel should have been shut down first.
256 DCHECK(!is_running_);
259 void Channel::OnReadMessage(
260 const MessageInTransit::View& message_view,
261 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
262 DCHECK(creation_thread_checker_.CalledOnValidThread());
264 switch (message_view.type()) {
265 case MessageInTransit::kTypeMessagePipeEndpoint:
266 case MessageInTransit::kTypeMessagePipe:
267 OnReadMessageForDownstream(message_view, platform_handles.Pass());
269 case MessageInTransit::kTypeChannel:
270 OnReadMessageForChannel(message_view, platform_handles.Pass());
274 base::StringPrintf("Received message of invalid type %u",
275 static_cast<unsigned>(message_view.type())));
280 void Channel::OnError(Error error) {
281 DCHECK(creation_thread_checker_.CalledOnValidThread());
284 case ERROR_READ_SHUTDOWN:
285 // The other side was cleanly closed, so this isn't actually an error.
286 DVLOG(1) << "RawChannel read error (shutdown)";
288 case ERROR_READ_BROKEN: {
289 base::AutoLock locker(lock_);
290 LOG_IF(ERROR, !is_shutting_down_)
291 << "RawChannel read error (connection broken)";
294 case ERROR_READ_BAD_MESSAGE:
295 // Receiving a bad message means either a bug, data corruption, or
296 // malicious attack (probably due to some other bug).
297 LOG(ERROR) << "RawChannel read error (received bad message)";
299 case ERROR_READ_UNKNOWN:
300 LOG(ERROR) << "RawChannel read error (unknown)";
303 // Write errors are slightly notable: they probably shouldn't happen under
304 // normal operation (but maybe the other side crashed).
305 LOG(WARNING) << "RawChannel write error";
311 void Channel::OnReadMessageForDownstream(
312 const MessageInTransit::View& message_view,
313 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314 DCHECK(creation_thread_checker_.CalledOnValidThread());
315 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
316 message_view.type() == MessageInTransit::kTypeMessagePipe);
318 MessageInTransit::EndpointId local_id = message_view.destination_id();
319 if (local_id == MessageInTransit::kInvalidEndpointId) {
320 HandleRemoteError("Received message with no destination ID");
324 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
325 scoped_refptr<MessagePipe> message_pipe;
327 bool nonexistent_local_id_error = false;
329 base::AutoLock locker(lock_);
331 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
332 // be called from the creation thread, |raw_channel_| should never be null
336 IdToEndpointMap::const_iterator it =
337 local_id_to_endpoint_map_.find(local_id);
338 if (it == local_id_to_endpoint_map_.end()) {
339 nonexistent_local_id_error = true;
341 state = it->second->state_;
342 message_pipe = it->second->message_pipe_;
343 port = it->second->port_;
346 if (nonexistent_local_id_error) {
347 HandleRemoteError(base::StringPrintf(
348 "Received a message for nonexistent local destination ID %u",
349 static_cast<unsigned>(local_id)));
350 // This is strongly indicative of some problem. However, it's not a fatal
351 // error, since it may indicate a buggy (or hostile) remote process. Don't
352 // die even for Debug builds, since handling this properly needs to be
353 // tested (TODO(vtl)).
354 DLOG(ERROR) << "This should not happen under normal operation.";
358 // Ignore messages for zombie endpoints (not an error).
359 if (state != ChannelEndpoint::STATE_NORMAL) {
360 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
361 << local_id << ", remote ID = " << message_view.source_id() << ")";
365 // We need to duplicate the message (data), because |EnqueueMessage()| will
366 // take ownership of it.
367 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
368 if (message_view.transport_data_buffer_size() > 0) {
369 DCHECK(message_view.transport_data_buffer());
370 message->SetDispatchers(TransportData::DeserializeDispatchers(
371 message_view.transport_data_buffer(),
372 message_view.transport_data_buffer_size(),
373 platform_handles.Pass(),
376 MojoResult result = message_pipe->EnqueueMessage(
377 MessagePipe::GetPeerPort(port), message.Pass());
378 if (result != MOJO_RESULT_OK) {
379 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
380 // has been closed (in an unavoidable race). This might also be a "remote"
381 // error, e.g., if the remote side is sending invalid control messages (to
382 // the message pipe).
383 HandleLocalError(base::StringPrintf(
384 "Failed to enqueue message to local ID %u (result %d)",
385 static_cast<unsigned>(local_id),
386 static_cast<int>(result)));
391 void Channel::OnReadMessageForChannel(
392 const MessageInTransit::View& message_view,
393 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
394 DCHECK(creation_thread_checker_.CalledOnValidThread());
395 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
397 // Currently, no channel messages take platform handles.
398 if (platform_handles) {
400 "Received invalid channel message (has platform handles)");
405 switch (message_view.subtype()) {
406 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
407 DVLOG(2) << "Handling channel message to run message pipe (local ID "
408 << message_view.destination_id() << ", remote ID "
409 << message_view.source_id() << ")";
410 if (!RunMessagePipeEndpoint(message_view.destination_id(),
411 message_view.source_id())) {
413 "Received invalid channel message to run message pipe");
416 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
417 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
418 << message_view.destination_id() << ", remote ID "
419 << message_view.source_id() << ")";
420 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421 message_view.source_id())) {
423 "Received invalid channel message to remove message pipe");
426 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
427 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
428 "ID " << message_view.destination_id() << ", remote ID "
429 << message_view.source_id() << ")";
430 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
431 message_view.source_id())) {
433 "Received invalid channel message to ack remove message pipe");
437 HandleRemoteError("Received invalid channel message");
443 bool Channel::RemoveMessagePipeEndpoint(
444 MessageInTransit::EndpointId local_id,
445 MessageInTransit::EndpointId remote_id) {
446 DCHECK(creation_thread_checker_.CalledOnValidThread());
448 // If this is non-null after the locked block, the endpoint should be detached
449 // (and no remove ack message sent).
450 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
451 scoped_refptr<MessagePipe> message_pipe;
454 base::AutoLock locker(lock_);
456 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
457 if (it == local_id_to_endpoint_map_.end()) {
458 DVLOG(2) << "Remove message pipe error: not found";
462 switch (it->second->state_) {
463 case ChannelEndpoint::STATE_NORMAL:
464 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
465 message_pipe = it->second->message_pipe_;
466 port = it->second->port_;
467 it->second->message_pipe_ = nullptr;
468 // We have to send a remove ack message (outside the lock).
470 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
471 DVLOG(2) << "Remove message pipe error: wrong state";
473 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
474 endpoint_to_detach = it->second;
475 local_id_to_endpoint_map_.erase(it);
476 // We have to detach (outside the lock).
480 if (endpoint_to_detach.get()) {
481 endpoint_to_detach->DetachFromChannel();
485 if (!SendControlMessage(
486 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
489 HandleLocalError(base::StringPrintf(
490 "Failed to send message to remove remote message pipe endpoint ack "
491 "(local ID %u, remote ID %u)",
492 static_cast<unsigned>(local_id),
493 static_cast<unsigned>(remote_id)));
496 message_pipe->OnRemove(port);
501 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
502 MessageInTransit::EndpointId local_id,
503 MessageInTransit::EndpointId remote_id) {
504 DVLOG(2) << "Sending channel control message: subtype " << subtype
505 << ", local ID " << local_id << ", remote ID " << remote_id;
506 scoped_ptr<MessageInTransit> message(new MessageInTransit(
507 MessageInTransit::kTypeChannel, subtype, 0, nullptr));
508 message->set_source_id(local_id);
509 message->set_destination_id(remote_id);
510 return WriteMessage(message.Pass());
513 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
514 // TODO(vtl): Is this how we really want to handle this? Probably we want to
515 // terminate the connection, since it's spewing invalid stuff.
516 LOG(WARNING) << error_message;
519 void Channel::HandleLocalError(const base::StringPiece& error_message) {
520 // TODO(vtl): Is this how we really want to handle this?
521 // Sometimes we'll want to propagate the error back to the message pipe
522 // (endpoint), and notify it that the remote is (effectively) closed.
523 // Sometimes we'll want to kill the channel (and notify all the endpoints that
524 // their remotes are dead.
525 LOG(WARNING) << error_message;
528 } // namespace system