1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/channel.h"
7 #include "base/basictypes.h"
9 #include "base/compiler_specific.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/strings/stringprintf.h"
17 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
18 MessageInTransit::kInvalidEndpointId,
19 kBootstrapEndpointId_is_invalid);
21 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
22 Channel::kBootstrapEndpointId;
24 Channel::EndpointInfo::EndpointInfo() {
27 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
29 : message_pipe(message_pipe),
33 Channel::EndpointInfo::~EndpointInfo() {
37 : next_local_id_(kBootstrapEndpointId) {
40 bool Channel::Init(embedder::ScopedPlatformHandle handle) {
41 DCHECK(creation_thread_checker_.CalledOnValidThread());
43 // No need to take |lock_|, since this must be called before this object
44 // becomes thread-safe.
45 DCHECK(!raw_channel_.get());
48 RawChannel::Create(handle.Pass(), this, base::MessageLoop::current()));
49 if (!raw_channel_->Init()) {
57 void Channel::Shutdown() {
58 DCHECK(creation_thread_checker_.CalledOnValidThread());
60 base::AutoLock locker(lock_);
61 DCHECK(raw_channel_.get());
62 raw_channel_->Shutdown();
65 // TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
69 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
70 scoped_refptr<MessagePipe> message_pipe, unsigned port) {
71 MessageInTransit::EndpointId local_id;
73 base::AutoLock locker(lock_);
75 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
76 local_id_to_endpoint_info_map_.find(next_local_id_) !=
77 local_id_to_endpoint_info_map_.end())
80 local_id = next_local_id_;
83 // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
84 // some expensive reference count increment/decrements.) Once this is done,
85 // we should be able to delete |EndpointInfo|'s default constructor.
86 local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
89 message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
93 void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
94 MessageInTransit::EndpointId remote_id) {
95 EndpointInfo endpoint_info;
97 base::AutoLock locker(lock_);
99 IdToEndpointInfoMap::const_iterator it =
100 local_id_to_endpoint_info_map_.find(local_id);
101 CHECK(it != local_id_to_endpoint_info_map_.end());
102 endpoint_info = it->second;
105 endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
108 bool Channel::WriteMessage(MessageInTransit* message) {
109 base::AutoLock locker(lock_);
110 if (!raw_channel_.get()) {
111 // TODO(vtl): I think this is probably not an error condition, but I should
112 // think about it (and the shutdown sequence) more carefully.
113 LOG(WARNING) << "WriteMessage() after shutdown";
117 return raw_channel_->WriteMessage(message);
120 void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
121 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
123 base::AutoLock locker_(lock_);
124 local_id_to_endpoint_info_map_.erase(local_id);
127 Channel::~Channel() {
128 // The channel should have been shut down first.
129 DCHECK(!raw_channel_.get());
131 DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
132 << "Destroying Channel with " << local_id_to_endpoint_info_map_.size()
133 << " endpoints still present";
136 void Channel::OnReadMessage(const MessageInTransit& message) {
137 switch (message.type()) {
138 case MessageInTransit::kTypeMessagePipeEndpoint:
139 case MessageInTransit::kTypeMessagePipe:
140 OnReadMessageForDownstream(message);
142 case MessageInTransit::kTypeChannel:
143 OnReadMessageForChannel(message);
146 HandleRemoteError(base::StringPrintf(
147 "Received message of invalid type %u",
148 static_cast<unsigned>(message.type())));
153 void Channel::OnFatalError(FatalError fatal_error) {
154 // TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
158 void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
159 DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
160 message.type() == MessageInTransit::kTypeMessagePipe);
162 MessageInTransit::EndpointId local_id = message.destination_id();
163 if (local_id == MessageInTransit::kInvalidEndpointId) {
164 HandleRemoteError("Received message with no destination ID");
168 EndpointInfo endpoint_info;
170 base::AutoLock locker(lock_);
172 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
173 // be called from the creation thread, |raw_channel_| should never be null
175 DCHECK(raw_channel_.get());
177 IdToEndpointInfoMap::const_iterator it =
178 local_id_to_endpoint_info_map_.find(local_id);
179 if (it == local_id_to_endpoint_info_map_.end()) {
180 HandleRemoteError(base::StringPrintf(
181 "Received a message for nonexistent local destination ID %u",
182 static_cast<unsigned>(local_id)));
183 // This is strongly indicative of some problem. However, it's not a fatal
184 // error, since it may indicate a bug (or hostile) remote process. Don't
185 // die even for Debug builds, since handling this properly needs to be
186 // tested (TODO(vtl)).
187 DLOG(ERROR) << "This should not happen under normal operation.";
190 endpoint_info = it->second;
193 // We need to duplicate the message, because |EnqueueMessage()| will take
195 // TODO(vtl): Need to enforce limits on message size and handle count.
196 MessageInTransit* own_message = MessageInTransit::Create(
197 message.type(), message.subtype(), message.bytes(), message.num_bytes(),
198 message.num_handles());
199 std::vector<DispatcherTransport> transports(message.num_handles());
200 // TODO(vtl): Create dispatchers for handles.
201 // TODO(vtl): It's bad that the current API will create equivalent dispatchers
202 // for the freshly-created ones, which is totally redundant. Make a version of
203 // |EnqueueMessage()| that passes ownership.
204 if (endpoint_info.message_pipe->EnqueueMessage(
205 MessagePipe::GetPeerPort(endpoint_info.port), own_message,
206 message.num_handles() ? &transports : NULL) != MOJO_RESULT_OK) {
207 HandleLocalError(base::StringPrintf(
208 "Failed to enqueue message to local destination ID %u",
209 static_cast<unsigned>(local_id)));
214 void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
215 // TODO(vtl): Currently no channel-only messages yet.
216 HandleRemoteError("Received invalid channel message");
220 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
221 // TODO(vtl): Is this how we really want to handle this?
222 LOG(WARNING) << error_message;
225 void Channel::HandleLocalError(const base::StringPiece& error_message) {
226 // TODO(vtl): Is this how we really want to handle this?
227 LOG(FATAL) << error_message;
230 } // namespace system