1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/channel.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId,
23 kBootstrapEndpointId_is_invalid);
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId;
28 Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() {
31 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
33 : state(STATE_NORMAL), message_pipe(message_pipe), port(port) {
36 Channel::EndpointInfo::~EndpointInfo() {
41 is_shutting_down_(false),
42 next_local_id_(kBootstrapEndpointId) {
45 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
46 DCHECK(creation_thread_checker_.CalledOnValidThread());
49 // No need to take |lock_|, since this must be called before this object
50 // becomes thread-safe.
52 raw_channel_ = raw_channel.Pass();
54 if (!raw_channel_->Init(this)) {
63 void Channel::Shutdown() {
64 DCHECK(creation_thread_checker_.CalledOnValidThread());
66 IdToEndpointInfoMap to_destroy;
68 base::AutoLock locker(lock_);
72 // Note: Don't reset |raw_channel_|, in case we're being called from within
73 // |OnReadMessage()| or |OnError()|.
74 raw_channel_->Shutdown();
77 // We need to deal with it outside the lock.
78 std::swap(to_destroy, local_id_to_endpoint_info_map_);
82 size_t num_zombies = 0;
83 for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
84 it != to_destroy.end();
86 if (it->second.state == EndpointInfo::STATE_NORMAL) {
87 it->second.message_pipe->OnRemove(it->second.port);
90 DCHECK(!it->second.message_pipe);
94 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
95 << " live endpoints and " << num_zombies
99 void Channel::WillShutdownSoon() {
100 base::AutoLock locker(lock_);
101 is_shutting_down_ = true;
104 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
105 scoped_refptr<MessagePipe> message_pipe,
107 DCHECK(message_pipe);
108 DCHECK(port == 0 || port == 1);
110 MessageInTransit::EndpointId local_id;
112 base::AutoLock locker(lock_);
114 DLOG_IF(WARNING, is_shutting_down_)
115 << "AttachMessagePipeEndpoint() while shutting down";
117 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
118 local_id_to_endpoint_info_map_.find(next_local_id_) !=
119 local_id_to_endpoint_info_map_.end())
122 local_id = next_local_id_;
125 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
126 // some expensive reference count increment/decrements.) Once this is done,
127 // we should be able to delete |EndpointInfo|'s default constructor.
128 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
131 // This might fail if that port got an |OnPeerClose()| before attaching.
132 if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
135 // Note: If it failed, quite possibly the endpoint info was removed from that
136 // map (there's a race between us adding it to the map above and calling
137 // |Attach()|). And even if an entry exists for |local_id|, we need to check
138 // that it's the one we added (and not some other one that was added since).
140 base::AutoLock locker(lock_);
141 IdToEndpointInfoMap::iterator it =
142 local_id_to_endpoint_info_map_.find(local_id);
143 if (it != local_id_to_endpoint_info_map_.end() &&
144 it->second.message_pipe.get() == message_pipe.get() &&
145 it->second.port == port) {
146 DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
147 // TODO(vtl): FIXME -- This is wrong. We need to specify (to
148 // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
149 // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
150 // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
151 // run, then we'll get messages to an "invalid" local ID (for running, for
153 local_id_to_endpoint_info_map_.erase(it);
156 return MessageInTransit::kInvalidEndpointId;
159 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
160 MessageInTransit::EndpointId remote_id) {
161 EndpointInfo endpoint_info;
163 base::AutoLock locker(lock_);
165 DLOG_IF(WARNING, is_shutting_down_)
166 << "RunMessagePipeEndpoint() while shutting down";
168 IdToEndpointInfoMap::const_iterator it =
169 local_id_to_endpoint_info_map_.find(local_id);
170 if (it == local_id_to_endpoint_info_map_.end())
172 endpoint_info = it->second;
175 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
177 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
178 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
179 "(local ID " << local_id << ", remote ID " << remote_id << ")";
183 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
184 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
185 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
189 void Channel::RunRemoteMessagePipeEndpoint(
190 MessageInTransit::EndpointId local_id,
191 MessageInTransit::EndpointId remote_id) {
194 base::AutoLock locker(lock_);
195 DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
196 local_id_to_endpoint_info_map_.end());
200 if (!SendControlMessage(
201 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
204 HandleLocalError(base::StringPrintf(
205 "Failed to send message to run remote message pipe endpoint (local ID "
207 static_cast<unsigned>(local_id),
208 static_cast<unsigned>(remote_id)));
212 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
213 base::AutoLock locker(lock_);
215 // TODO(vtl): I think this is probably not an error condition, but I should
216 // think about it (and the shutdown sequence) more carefully.
217 LOG(WARNING) << "WriteMessage() after shutdown";
221 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
222 return raw_channel_->WriteMessage(message.Pass());
225 bool Channel::IsWriteBufferEmpty() {
226 base::AutoLock locker(lock_);
229 return raw_channel_->IsWriteBufferEmpty();
232 void Channel::DetachMessagePipeEndpoint(
233 MessageInTransit::EndpointId local_id,
234 MessageInTransit::EndpointId remote_id) {
235 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
237 bool should_send_remove_message = false;
239 base::AutoLock locker_(lock_);
243 IdToEndpointInfoMap::iterator it =
244 local_id_to_endpoint_info_map_.find(local_id);
245 DCHECK(it != local_id_to_endpoint_info_map_.end());
247 switch (it->second.state) {
248 case EndpointInfo::STATE_NORMAL:
249 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
250 it->second.message_pipe = NULL;
251 should_send_remove_message =
252 (remote_id != MessageInTransit::kInvalidEndpointId);
254 case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
255 local_id_to_endpoint_info_map_.erase(it);
257 case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
260 case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
261 it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
265 if (!should_send_remove_message)
268 if (!SendControlMessage(
269 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
272 HandleLocalError(base::StringPrintf(
273 "Failed to send message to remove remote message pipe endpoint (local "
274 "ID %u, remote ID %u)",
275 static_cast<unsigned>(local_id),
276 static_cast<unsigned>(remote_id)));
280 size_t Channel::GetSerializedPlatformHandleSize() const {
281 return raw_channel_->GetSerializedPlatformHandleSize();
284 Channel::~Channel() {
285 // The channel should have been shut down first.
286 DCHECK(!is_running_);
289 void Channel::OnReadMessage(
290 const MessageInTransit::View& message_view,
291 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
292 switch (message_view.type()) {
293 case MessageInTransit::kTypeMessagePipeEndpoint:
294 case MessageInTransit::kTypeMessagePipe:
295 OnReadMessageForDownstream(message_view, platform_handles.Pass());
297 case MessageInTransit::kTypeChannel:
298 OnReadMessageForChannel(message_view, platform_handles.Pass());
302 base::StringPrintf("Received message of invalid type %u",
303 static_cast<unsigned>(message_view.type())));
308 void Channel::OnError(Error error) {
310 case ERROR_READ_SHUTDOWN:
311 // The other side was cleanly closed, so this isn't actually an error.
312 DVLOG(1) << "RawChannel read error (shutdown)";
314 case ERROR_READ_BROKEN: {
315 base::AutoLock locker(lock_);
316 LOG_IF(ERROR, !is_shutting_down_)
317 << "RawChannel read error (connection broken)";
320 case ERROR_READ_BAD_MESSAGE:
321 // Receiving a bad message means either a bug, data corruption, or
322 // malicious attack (probably due to some other bug).
323 LOG(ERROR) << "RawChannel read error (received bad message)";
325 case ERROR_READ_UNKNOWN:
326 LOG(ERROR) << "RawChannel read error (unknown)";
329 // Write errors are slightly notable: they probably shouldn't happen under
330 // normal operation (but maybe the other side crashed).
331 LOG(WARNING) << "RawChannel write error";
337 void Channel::OnReadMessageForDownstream(
338 const MessageInTransit::View& message_view,
339 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
340 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
341 message_view.type() == MessageInTransit::kTypeMessagePipe);
343 MessageInTransit::EndpointId local_id = message_view.destination_id();
344 if (local_id == MessageInTransit::kInvalidEndpointId) {
345 HandleRemoteError("Received message with no destination ID");
349 EndpointInfo endpoint_info;
351 base::AutoLock locker(lock_);
353 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
354 // be called from the creation thread, |raw_channel_| should never be null
358 IdToEndpointInfoMap::const_iterator it =
359 local_id_to_endpoint_info_map_.find(local_id);
360 if (it == local_id_to_endpoint_info_map_.end()) {
361 HandleRemoteError(base::StringPrintf(
362 "Received a message for nonexistent local destination ID %u",
363 static_cast<unsigned>(local_id)));
364 // This is strongly indicative of some problem. However, it's not a fatal
365 // error, since it may indicate a bug (or hostile) remote process. Don't
366 // die even for Debug builds, since handling this properly needs to be
367 // tested (TODO(vtl)).
368 DLOG(ERROR) << "This should not happen under normal operation.";
371 endpoint_info = it->second;
374 // Ignore messages for zombie endpoints (not an error).
375 if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
376 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
377 << local_id << ", remote ID = " << message_view.source_id() << ")";
381 // We need to duplicate the message (data), because |EnqueueMessage()| will
382 // take ownership of it.
383 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
384 if (message_view.transport_data_buffer_size() > 0) {
385 DCHECK(message_view.transport_data_buffer());
386 message->SetDispatchers(TransportData::DeserializeDispatchers(
387 message_view.transport_data_buffer(),
388 message_view.transport_data_buffer_size(),
389 platform_handles.Pass(),
392 MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
393 MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
394 if (result != MOJO_RESULT_OK) {
395 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
396 // has been closed (in an unavoidable race). This might also be a "remote"
397 // error, e.g., if the remote side is sending invalid control messages (to
398 // the message pipe).
399 HandleLocalError(base::StringPrintf(
400 "Failed to enqueue message to local ID %u (result %d)",
401 static_cast<unsigned>(local_id),
402 static_cast<int>(result)));
407 void Channel::OnReadMessageForChannel(
408 const MessageInTransit::View& message_view,
409 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
410 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
412 // Currently, no channel messages take platform handles.
413 if (platform_handles) {
415 "Received invalid channel message (has platform handles)");
420 switch (message_view.subtype()) {
421 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
422 DVLOG(2) << "Handling channel message to run message pipe (local ID "
423 << message_view.destination_id() << ", remote ID "
424 << message_view.source_id() << ")";
425 if (!RunMessagePipeEndpoint(message_view.destination_id(),
426 message_view.source_id())) {
428 "Received invalid channel message to run message pipe");
431 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
432 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
433 << message_view.destination_id() << ", remote ID "
434 << message_view.source_id() << ")";
435 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
436 message_view.source_id())) {
438 "Received invalid channel message to remove message pipe");
441 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
442 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
443 "ID " << message_view.destination_id() << ", remote ID "
444 << message_view.source_id() << ")";
445 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
446 message_view.source_id())) {
448 "Received invalid channel message to ack remove message pipe");
452 HandleRemoteError("Received invalid channel message");
458 bool Channel::RemoveMessagePipeEndpoint(
459 MessageInTransit::EndpointId local_id,
460 MessageInTransit::EndpointId remote_id) {
461 EndpointInfo endpoint_info;
463 base::AutoLock locker(lock_);
465 IdToEndpointInfoMap::iterator it =
466 local_id_to_endpoint_info_map_.find(local_id);
467 if (it == local_id_to_endpoint_info_map_.end()) {
468 DVLOG(2) << "Remove message pipe error: not found";
472 // If it's waiting for the remove ack, just do it and return.
473 if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
474 local_id_to_endpoint_info_map_.erase(it);
478 if (it->second.state != EndpointInfo::STATE_NORMAL) {
479 DVLOG(2) << "Remove message pipe error: wrong state";
483 it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
484 endpoint_info = it->second;
485 it->second.message_pipe = NULL;
488 if (!SendControlMessage(
489 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
492 HandleLocalError(base::StringPrintf(
493 "Failed to send message to remove remote message pipe endpoint ack "
494 "(local ID %u, remote ID %u)",
495 static_cast<unsigned>(local_id),
496 static_cast<unsigned>(remote_id)));
499 endpoint_info.message_pipe->OnRemove(endpoint_info.port);
504 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
505 MessageInTransit::EndpointId local_id,
506 MessageInTransit::EndpointId remote_id) {
507 DVLOG(2) << "Sending channel control message: subtype " << subtype
508 << ", local ID " << local_id << ", remote ID " << remote_id;
509 scoped_ptr<MessageInTransit> message(
510 new MessageInTransit(MessageInTransit::kTypeChannel, subtype, 0, NULL));
511 message->set_source_id(local_id);
512 message->set_destination_id(remote_id);
513 return WriteMessage(message.Pass());
516 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
517 // TODO(vtl): Is this how we really want to handle this? Probably we want to
518 // terminate the connection, since it's spewing invalid stuff.
519 LOG(WARNING) << error_message;
522 void Channel::HandleLocalError(const base::StringPiece& error_message) {
523 // TODO(vtl): Is this how we really want to handle this?
524 // Sometimes we'll want to propagate the error back to the message pipe
525 // (endpoint), and notify it that the remote is (effectively) closed.
526 // Sometimes we'll want to kill the channel (and notify all the endpoints that
527 // their remotes are dead.
528 LOG(WARNING) << error_message;
531 } // namespace system