1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe_dispatcher.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/constants.h"
10 #include "mojo/system/local_message_pipe_endpoint.h"
11 #include "mojo/system/memory.h"
12 #include "mojo/system/message_in_transit.h"
13 #include "mojo/system/message_pipe.h"
14 #include "mojo/system/proxy_message_pipe_endpoint.h"
21 const unsigned kInvalidPort = static_cast<unsigned>(-1);
23 struct SerializedMessagePipeDispatcher {
24 MessageInTransit::EndpointId endpoint_id;
29 // MessagePipeDispatcher -------------------------------------------------------
31 MessagePipeDispatcher::MessagePipeDispatcher()
32 : port_(kInvalidPort) {
35 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
37 DCHECK(message_pipe.get());
38 DCHECK(port == 0 || port == 1);
40 message_pipe_ = message_pipe;
44 Dispatcher::Type MessagePipeDispatcher::GetType() const {
45 return kTypeMessagePipe;
49 std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
50 MessagePipeDispatcher::CreateRemoteMessagePipe() {
51 scoped_refptr<MessagePipe> message_pipe(
53 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
54 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
55 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
56 dispatcher->Init(message_pipe, 0);
58 return std::make_pair(dispatcher, message_pipe);
62 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
66 if (size != sizeof(SerializedMessagePipeDispatcher)) {
67 LOG(ERROR) << "Invalid serialized message pipe dispatcher";
68 return scoped_refptr<MessagePipeDispatcher>();
71 std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
72 remote_message_pipe = CreateRemoteMessagePipe();
74 MessageInTransit::EndpointId remote_id =
75 static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
76 if (remote_id == MessageInTransit::kInvalidEndpointId) {
77 // This means that the other end was closed, and there were no messages
79 // TODO(vtl): This is wrong. We should produce a "dead" message pipe
82 return scoped_refptr<MessagePipeDispatcher>();
84 MessageInTransit::EndpointId local_id =
85 channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
86 if (local_id == MessageInTransit::kInvalidEndpointId) {
87 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
88 "attach; remote ID = " << remote_id << ")";
89 return scoped_refptr<MessagePipeDispatcher>();
91 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
92 << remote_id << ", new local ID = " << local_id << ")";
94 if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
95 // In general, this shouldn't fail, since we generated |local_id| locally.
97 return scoped_refptr<MessagePipeDispatcher>();
100 // TODO(vtl): FIXME -- Need some error handling here.
101 channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
102 return remote_message_pipe.first;
105 MessagePipeDispatcher::~MessagePipeDispatcher() {
106 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
107 DCHECK(!message_pipe_.get());
110 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
111 lock().AssertAcquired();
112 return message_pipe_.get();
115 unsigned MessagePipeDispatcher::GetPortNoLock() const {
116 lock().AssertAcquired();
120 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
121 lock().AssertAcquired();
122 message_pipe_->CancelAllWaiters(port_);
125 void MessagePipeDispatcher::CloseImplNoLock() {
126 lock().AssertAcquired();
127 message_pipe_->Close(port_);
128 message_pipe_ = NULL;
129 port_ = kInvalidPort;
132 scoped_refptr<Dispatcher>
133 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
134 lock().AssertAcquired();
136 scoped_refptr<MessagePipeDispatcher> rv = new MessagePipeDispatcher();
137 rv->Init(message_pipe_, port_);
138 message_pipe_ = NULL;
139 port_ = kInvalidPort;
140 return scoped_refptr<Dispatcher>(rv.get());
143 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
146 std::vector<DispatcherTransport>* transports,
147 MojoWriteMessageFlags flags) {
148 DCHECK(!transports || (transports->size() > 0 &&
149 transports->size() <= kMaxMessageNumHandles));
151 lock().AssertAcquired();
153 if (!VerifyUserPointer<void>(bytes, num_bytes))
154 return MOJO_RESULT_INVALID_ARGUMENT;
155 if (num_bytes > kMaxMessageNumBytes)
156 return MOJO_RESULT_RESOURCE_EXHAUSTED;
158 return message_pipe_->WriteMessage(port_, bytes, num_bytes, transports,
162 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
165 DispatcherVector* dispatchers,
166 uint32_t* num_dispatchers,
167 MojoReadMessageFlags flags) {
168 lock().AssertAcquired();
171 if (!VerifyUserPointer<uint32_t>(num_bytes, 1))
172 return MOJO_RESULT_INVALID_ARGUMENT;
173 if (!VerifyUserPointer<void>(bytes, *num_bytes))
174 return MOJO_RESULT_INVALID_ARGUMENT;
177 return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
178 num_dispatchers, flags);
181 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
183 MojoResult wake_result) {
184 lock().AssertAcquired();
185 return message_pipe_->AddWaiter(port_, waiter, flags, wake_result);
188 void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
189 lock().AssertAcquired();
190 message_pipe_->RemoveWaiter(port_, waiter);
193 void MessagePipeDispatcher::StartSerializeImplNoLock(
194 Channel* /*channel*/,
196 size_t* max_platform_handles) {
197 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
198 *max_size = sizeof(SerializedMessagePipeDispatcher);
199 *max_platform_handles = 0;
202 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
206 std::vector<embedder::PlatformHandle>* platform_handles) {
207 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
209 // Convert the local endpoint to a proxy endpoint (moving the message queue).
210 message_pipe_->ConvertLocalToProxy(port_);
212 // Attach the new proxy endpoint to the channel.
213 MessageInTransit::EndpointId endpoint_id =
214 channel->AttachMessagePipeEndpoint(message_pipe_, port_);
215 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
216 // possible that the other endpoint -- the one that we're not sending -- was
217 // closed in the intervening time.) In that case, we need to deserialize a
218 // "dead" message pipe dispatcher on the other end. (Note that this is
219 // different from just producing |MOJO_HANDLE_INVALID|.)
220 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
223 // We now have a local ID. Before we can run the proxy endpoint, we need to
224 // get an ack back from the other side with the remote ID.
225 static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
228 message_pipe_ = NULL;
229 port_ = kInvalidPort;
231 *actual_size = sizeof(SerializedMessagePipeDispatcher);
235 // MessagePipeDispatcherTransport ----------------------------------------------
237 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
238 DispatcherTransport transport) : DispatcherTransport(transport) {
239 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
242 } // namespace system