1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/constants.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/memory.h"
14 #include "mojo/edk/system/message_pipe.h"
15 #include "mojo/edk/system/options_validation.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
23 const unsigned kInvalidPort = static_cast<unsigned>(-1);
25 struct SerializedMessagePipeDispatcher {
26 // This is the endpoint ID on the receiving side, and should be a "remote ID".
27 // (The receiving side should have already have an endpoint attached and run
28 // via the |Channel|s. This endpoint will have both IDs assigned, so this ID
29 // is only needed to associated that endpoint with a particular dispatcher.)
30 ChannelEndpointId receiver_endpoint_id;
35 // MessagePipeDispatcher -------------------------------------------------------
38 const MojoCreateMessagePipeOptions
39 MessagePipeDispatcher::kDefaultCreateOptions = {
40 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
41 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
43 MessagePipeDispatcher::MessagePipeDispatcher(
44 const MojoCreateMessagePipeOptions& /*validated_options*/)
45 : port_(kInvalidPort) {
49 MojoResult MessagePipeDispatcher::ValidateCreateOptions(
50 UserPointer<const MojoCreateMessagePipeOptions> in_options,
51 MojoCreateMessagePipeOptions* out_options) {
52 const MojoCreateMessagePipeOptionsFlags kKnownFlags =
53 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
55 *out_options = kDefaultCreateOptions;
56 if (in_options.IsNull())
57 return MOJO_RESULT_OK;
59 UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
60 if (!reader.is_valid())
61 return MOJO_RESULT_INVALID_ARGUMENT;
63 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
64 return MOJO_RESULT_OK;
65 if ((reader.options().flags & ~kKnownFlags))
66 return MOJO_RESULT_UNIMPLEMENTED;
67 out_options->flags = reader.options().flags;
69 // Checks for fields beyond |flags|:
71 // (Nothing here yet.)
73 return MOJO_RESULT_OK;
76 void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
78 DCHECK(message_pipe.get());
79 DCHECK(port == 0 || port == 1);
81 message_pipe_ = message_pipe;
85 Dispatcher::Type MessagePipeDispatcher::GetType() const {
86 return kTypeMessagePipe;
90 scoped_refptr<MessagePipeDispatcher>
91 MessagePipeDispatcher::CreateRemoteMessagePipe(
92 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
93 scoped_refptr<MessagePipe> message_pipe(
94 MessagePipe::CreateLocalProxy(channel_endpoint));
95 scoped_refptr<MessagePipeDispatcher> dispatcher(
96 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
97 dispatcher->Init(message_pipe, 0);
102 scoped_refptr<MessagePipeDispatcher> MessagePipeDispatcher::Deserialize(
106 if (size != sizeof(SerializedMessagePipeDispatcher)) {
107 LOG(ERROR) << "Invalid serialized message pipe dispatcher";
108 return scoped_refptr<MessagePipeDispatcher>();
111 const SerializedMessagePipeDispatcher* s =
112 static_cast<const SerializedMessagePipeDispatcher*>(source);
113 scoped_refptr<MessagePipe> message_pipe =
114 channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
115 if (!message_pipe.get()) {
116 LOG(ERROR) << "Failed to deserialize message pipe dispatcher (ID = "
117 << s->receiver_endpoint_id << ")";
118 return scoped_refptr<MessagePipeDispatcher>();
121 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
122 << s->receiver_endpoint_id << ")";
123 scoped_refptr<MessagePipeDispatcher> dispatcher(
124 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
125 dispatcher->Init(message_pipe, 0);
129 MessagePipeDispatcher::~MessagePipeDispatcher() {
130 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
131 DCHECK(!message_pipe_.get());
134 MessagePipe* MessagePipeDispatcher::GetMessagePipeNoLock() const {
135 lock().AssertAcquired();
136 return message_pipe_.get();
139 unsigned MessagePipeDispatcher::GetPortNoLock() const {
140 lock().AssertAcquired();
144 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
145 lock().AssertAcquired();
146 message_pipe_->CancelAllWaiters(port_);
149 void MessagePipeDispatcher::CloseImplNoLock() {
150 lock().AssertAcquired();
151 message_pipe_->Close(port_);
152 message_pipe_ = nullptr;
153 port_ = kInvalidPort;
156 scoped_refptr<Dispatcher>
157 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
158 lock().AssertAcquired();
160 // TODO(vtl): Currently, there are no options, so we just use
161 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
163 scoped_refptr<MessagePipeDispatcher> rv =
164 new MessagePipeDispatcher(kDefaultCreateOptions);
165 rv->Init(message_pipe_, port_);
166 message_pipe_ = nullptr;
167 port_ = kInvalidPort;
168 return scoped_refptr<Dispatcher>(rv.get());
171 MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
172 UserPointer<const void> bytes,
174 std::vector<DispatcherTransport>* transports,
175 MojoWriteMessageFlags flags) {
176 DCHECK(!transports || (transports->size() > 0 &&
177 transports->size() <= kMaxMessageNumHandles));
179 lock().AssertAcquired();
181 if (num_bytes > kMaxMessageNumBytes)
182 return MOJO_RESULT_RESOURCE_EXHAUSTED;
184 return message_pipe_->WriteMessage(
185 port_, bytes, num_bytes, transports, flags);
188 MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
189 UserPointer<void> bytes,
190 UserPointer<uint32_t> num_bytes,
191 DispatcherVector* dispatchers,
192 uint32_t* num_dispatchers,
193 MojoReadMessageFlags flags) {
194 lock().AssertAcquired();
195 return message_pipe_->ReadMessage(
196 port_, bytes, num_bytes, dispatchers, num_dispatchers, flags);
199 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
201 lock().AssertAcquired();
202 return message_pipe_->GetHandleSignalsState(port_);
205 MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(
207 MojoHandleSignals signals,
209 HandleSignalsState* signals_state) {
210 lock().AssertAcquired();
211 return message_pipe_->AddWaiter(
212 port_, waiter, signals, context, signals_state);
215 void MessagePipeDispatcher::RemoveWaiterImplNoLock(
217 HandleSignalsState* signals_state) {
218 lock().AssertAcquired();
219 message_pipe_->RemoveWaiter(port_, waiter, signals_state);
222 void MessagePipeDispatcher::StartSerializeImplNoLock(
223 Channel* /*channel*/,
225 size_t* max_platform_handles) {
226 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
227 *max_size = sizeof(SerializedMessagePipeDispatcher);
228 *max_platform_handles = 0;
231 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
235 embedder::PlatformHandleVector* /*platform_handles*/) {
236 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
238 SerializedMessagePipeDispatcher* s =
239 static_cast<SerializedMessagePipeDispatcher*>(destination);
241 // Convert the local endpoint to a proxy endpoint (moving the message queue)
242 // and attach it to the channel.
243 s->receiver_endpoint_id = channel->AttachAndRunEndpoint(
244 message_pipe_->ConvertLocalToProxy(port_), false);
245 DVLOG(2) << "Serializing message pipe dispatcher (remote ID = "
246 << s->receiver_endpoint_id << ")";
248 message_pipe_ = nullptr;
249 port_ = kInvalidPort;
251 *actual_size = sizeof(SerializedMessagePipeDispatcher);
255 // MessagePipeDispatcherTransport ----------------------------------------------
257 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
258 DispatcherTransport transport)
259 : DispatcherTransport(transport) {
260 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
263 } // namespace system