namespace mojo {
namespace system {
-COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
- MessageInTransit::kInvalidEndpointId,
- kBootstrapEndpointId_is_invalid);
+static_assert(Channel::kBootstrapEndpointId !=
+ MessageInTransit::kInvalidEndpointId,
+ "kBootstrapEndpointId is invalid");
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
Channel::kBootstrapEndpointId;
-Channel::EndpointInfo::EndpointInfo() : state(STATE_NORMAL), port() {
-}
-
-Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
- unsigned port)
- : state(STATE_NORMAL), message_pipe(message_pipe), port(port) {
-}
-
-Channel::EndpointInfo::~EndpointInfo() {
-}
-
-Channel::Channel()
- : is_running_(false),
+Channel::Channel(embedder::PlatformSupport* platform_support)
+ : platform_support_(platform_support),
+ is_running_(false),
is_shutting_down_(false),
next_local_id_(kBootstrapEndpointId) {
}
void Channel::Shutdown() {
DCHECK(creation_thread_checker_.CalledOnValidThread());
- IdToEndpointInfoMap to_destroy;
+ IdToEndpointMap to_destroy;
{
base::AutoLock locker(lock_);
if (!is_running_)
is_running_ = false;
// We need to deal with it outside the lock.
- std::swap(to_destroy, local_id_to_endpoint_info_map_);
+ std::swap(to_destroy, local_id_to_endpoint_map_);
}
size_t num_live = 0;
size_t num_zombies = 0;
- for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
+ for (IdToEndpointMap::iterator it = to_destroy.begin();
it != to_destroy.end();
++it) {
- if (it->second.state == EndpointInfo::STATE_NORMAL) {
- it->second.message_pipe->OnRemove(it->second.port);
+ if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
+ it->second->message_pipe_->OnRemove(it->second->port_);
num_live++;
} else {
- DCHECK(!it->second.message_pipe);
+ DCHECK(!it->second->message_pipe_.get());
num_zombies++;
}
+ it->second->DetachFromChannel();
}
DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
<< " live endpoints and " << num_zombies
is_shutting_down_ = true;
}
-MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
- scoped_refptr<MessagePipe> message_pipe,
- unsigned port) {
- DCHECK(message_pipe);
- DCHECK(port == 0 || port == 1);
+// Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
+// keeps the endpoint alive even after the lock is released. Otherwise, there's
+// the temptation to simply pass the result of |new ChannelEndpoint(...)|
+// directly to this function, which wouldn't be sufficient for safety.
+MessageInTransit::EndpointId Channel::AttachEndpoint(
+ scoped_refptr<ChannelEndpoint> endpoint) {
+ DCHECK(endpoint.get());
MessageInTransit::EndpointId local_id;
{
base::AutoLock locker(lock_);
DLOG_IF(WARNING, is_shutting_down_)
- << "AttachMessagePipeEndpoint() while shutting down";
+ << "AttachEndpoint() while shutting down";
while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
- local_id_to_endpoint_info_map_.find(next_local_id_) !=
- local_id_to_endpoint_info_map_.end())
+ local_id_to_endpoint_map_.find(next_local_id_) !=
+ local_id_to_endpoint_map_.end())
next_local_id_++;
local_id = next_local_id_;
next_local_id_++;
-
- // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
- // some expensive reference count increment/decrements.) Once this is done,
- // we should be able to delete |EndpointInfo|'s default constructor.
- local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
+ local_id_to_endpoint_map_[local_id] = endpoint;
}
- // This might fail if that port got an |OnPeerClose()| before attaching.
- if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
- return local_id;
-
- // Note: If it failed, quite possibly the endpoint info was removed from that
- // map (there's a race between us adding it to the map above and calling
- // |Attach()|). And even if an entry exists for |local_id|, we need to check
- // that it's the one we added (and not some other one that was added since).
- {
- base::AutoLock locker(lock_);
- IdToEndpointInfoMap::iterator it =
- local_id_to_endpoint_info_map_.find(local_id);
- if (it != local_id_to_endpoint_info_map_.end() &&
- it->second.message_pipe.get() == message_pipe.get() &&
- it->second.port == port) {
- DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
- // TODO(vtl): FIXME -- This is wrong. We need to specify (to
- // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
- // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
- // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
- // run, then we'll get messages to an "invalid" local ID (for running, for
- // removal).
- local_id_to_endpoint_info_map_.erase(it);
- }
- }
- return MessageInTransit::kInvalidEndpointId;
+ endpoint->AttachToChannel(this, local_id);
+ return local_id;
}
bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
- EndpointInfo endpoint_info;
+ scoped_refptr<ChannelEndpoint> endpoint;
+ ChannelEndpoint::State state;
{
base::AutoLock locker(lock_);
DLOG_IF(WARNING, is_shutting_down_)
<< "RunMessagePipeEndpoint() while shutting down";
- IdToEndpointInfoMap::const_iterator it =
- local_id_to_endpoint_info_map_.find(local_id);
- if (it == local_id_to_endpoint_info_map_.end())
+ IdToEndpointMap::const_iterator it =
+ local_id_to_endpoint_map_.find(local_id);
+ if (it == local_id_to_endpoint_map_.end())
return false;
- endpoint_info = it->second;
+ endpoint = it->second;
+ state = it->second->state_;
}
// Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
// and ignore it.
- if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
+ if (state != ChannelEndpoint::STATE_NORMAL) {
DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
"(local ID " << local_id << ", remote ID " << remote_id << ")";
return true;
// TODO(vtl): FIXME -- We need to handle the case that message pipe is already
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
- endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
+ endpoint->Run(remote_id);
return true;
}
#if DCHECK_IS_ON
{
base::AutoLock locker(lock_);
- DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
- local_id_to_endpoint_info_map_.end());
+ DCHECK(local_id_to_endpoint_map_.find(local_id) !=
+ local_id_to_endpoint_map_.end());
}
#endif
MessageInTransit::EndpointId remote_id) {
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
- bool should_send_remove_message = false;
+ // If this is non-null after the locked block, the endpoint should be detached
+ // (and no remove message sent).
+ scoped_refptr<ChannelEndpoint> endpoint_to_detach;
{
base::AutoLock locker_(lock_);
if (!is_running_)
return;
- IdToEndpointInfoMap::iterator it =
- local_id_to_endpoint_info_map_.find(local_id);
- DCHECK(it != local_id_to_endpoint_info_map_.end());
+ IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
+ DCHECK(it != local_id_to_endpoint_map_.end());
- switch (it->second.state) {
- case EndpointInfo::STATE_NORMAL:
- it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
- it->second.message_pipe = NULL;
- should_send_remove_message =
- (remote_id != MessageInTransit::kInvalidEndpointId);
+ switch (it->second->state_) {
+ case ChannelEndpoint::STATE_NORMAL:
+ it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
+ it->second->message_pipe_ = nullptr;
+ if (remote_id == MessageInTransit::kInvalidEndpointId)
+ return;
+ // We have to send a remove message (outside the lock).
break;
- case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
- local_id_to_endpoint_info_map_.erase(it);
+ case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
+ endpoint_to_detach = it->second;
+ local_id_to_endpoint_map_.erase(it);
+ // We have to detach (outside the lock).
break;
- case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
+ case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
NOTREACHED();
- break;
- case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
- it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
- break;
+ return;
}
}
- if (!should_send_remove_message)
+ if (endpoint_to_detach.get()) {
+ endpoint_to_detach->DetachFromChannel();
return;
+ }
if (!SendControlMessage(
MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
void Channel::OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
switch (message_view.type()) {
case MessageInTransit::kTypeMessagePipeEndpoint:
case MessageInTransit::kTypeMessagePipe:
}
void Channel::OnError(Error error) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
switch (error) {
case ERROR_READ_SHUTDOWN:
// The other side was cleanly closed, so this isn't actually an error.
void Channel::OnReadMessageForDownstream(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
message_view.type() == MessageInTransit::kTypeMessagePipe);
return;
}
- EndpointInfo endpoint_info;
+ ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
+ scoped_refptr<MessagePipe> message_pipe;
+ unsigned port = ~0u;
+ bool nonexistent_local_id_error = false;
{
base::AutoLock locker(lock_);
// here.
DCHECK(is_running_);
- IdToEndpointInfoMap::const_iterator it =
- local_id_to_endpoint_info_map_.find(local_id);
- if (it == local_id_to_endpoint_info_map_.end()) {
- HandleRemoteError(base::StringPrintf(
- "Received a message for nonexistent local destination ID %u",
- static_cast<unsigned>(local_id)));
- // This is strongly indicative of some problem. However, it's not a fatal
- // error, since it may indicate a bug (or hostile) remote process. Don't
- // die even for Debug builds, since handling this properly needs to be
- // tested (TODO(vtl)).
- DLOG(ERROR) << "This should not happen under normal operation.";
- return;
+ IdToEndpointMap::const_iterator it =
+ local_id_to_endpoint_map_.find(local_id);
+ if (it == local_id_to_endpoint_map_.end()) {
+ nonexistent_local_id_error = true;
+ } else {
+ state = it->second->state_;
+ message_pipe = it->second->message_pipe_;
+ port = it->second->port_;
}
- endpoint_info = it->second;
+ }
+ if (nonexistent_local_id_error) {
+ HandleRemoteError(base::StringPrintf(
+ "Received a message for nonexistent local destination ID %u",
+ static_cast<unsigned>(local_id)));
+ // This is strongly indicative of some problem. However, it's not a fatal
+ // error, since it may indicate a buggy (or hostile) remote process. Don't
+ // die even for Debug builds, since handling this properly needs to be
+ // tested (TODO(vtl)).
+ DLOG(ERROR) << "This should not happen under normal operation.";
+ return;
}
// Ignore messages for zombie endpoints (not an error).
- if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
+ if (state != ChannelEndpoint::STATE_NORMAL) {
DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
<< local_id << ", remote ID = " << message_view.source_id() << ")";
return;
platform_handles.Pass(),
this));
}
- MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
- MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
+ MojoResult result = message_pipe->EnqueueMessage(
+ MessagePipe::GetPeerPort(port), message.Pass());
if (result != MOJO_RESULT_OK) {
// TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
// has been closed (in an unavoidable race). This might also be a "remote"
void Channel::OnReadMessageForChannel(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
// Currently, no channel messages take platform handles.
bool Channel::RemoveMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
- EndpointInfo endpoint_info;
+ DCHECK(creation_thread_checker_.CalledOnValidThread());
+
+ // If this is non-null after the locked block, the endpoint should be detached
+ // (and no remove ack message sent).
+ scoped_refptr<ChannelEndpoint> endpoint_to_detach;
+ scoped_refptr<MessagePipe> message_pipe;
+ unsigned port = ~0u;
{
base::AutoLock locker(lock_);
- IdToEndpointInfoMap::iterator it =
- local_id_to_endpoint_info_map_.find(local_id);
- if (it == local_id_to_endpoint_info_map_.end()) {
+ IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
+ if (it == local_id_to_endpoint_map_.end()) {
DVLOG(2) << "Remove message pipe error: not found";
return false;
}
- // If it's waiting for the remove ack, just do it and return.
- if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
- local_id_to_endpoint_info_map_.erase(it);
- return true;
- }
-
- if (it->second.state != EndpointInfo::STATE_NORMAL) {
- DVLOG(2) << "Remove message pipe error: wrong state";
- return false;
+ switch (it->second->state_) {
+ case ChannelEndpoint::STATE_NORMAL:
+ it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
+ message_pipe = it->second->message_pipe_;
+ port = it->second->port_;
+ it->second->message_pipe_ = nullptr;
+ // We have to send a remove ack message (outside the lock).
+ break;
+ case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
+ DVLOG(2) << "Remove message pipe error: wrong state";
+ return false;
+ case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
+ endpoint_to_detach = it->second;
+ local_id_to_endpoint_map_.erase(it);
+ // We have to detach (outside the lock).
+ break;
}
-
- it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
- endpoint_info = it->second;
- it->second.message_pipe = NULL;
+ }
+ if (endpoint_to_detach.get()) {
+ endpoint_to_detach->DetachFromChannel();
+ return true;
}
if (!SendControlMessage(
static_cast<unsigned>(remote_id)));
}
- endpoint_info.message_pipe->OnRemove(endpoint_info.port);
+ message_pipe->OnRemove(port);
return true;
}
MessageInTransit::EndpointId remote_id) {
DVLOG(2) << "Sending channel control message: subtype " << subtype
<< ", local ID " << local_id << ", remote ID " << remote_id;
- scoped_ptr<MessageInTransit> message(
- new MessageInTransit(MessageInTransit::kTypeChannel, subtype, 0, NULL));
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::kTypeChannel, subtype, 0, nullptr));
message->set_source_id(local_id);
message->set_destination_id(remote_id);
return WriteMessage(message.Pass());