1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel_endpoint.h"
9 #include "mojo/edk/system/local_message_pipe_endpoint.h"
10 #include "mojo/edk/system/message_in_transit.h"
11 #include "mojo/edk/system/message_pipe_dispatcher.h"
12 #include "mojo/edk/system/message_pipe_endpoint.h"
13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
19 MessagePipe* MessagePipe::CreateLocalLocal() {
20 MessagePipe* message_pipe = new MessagePipe();
21 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
27 MessagePipe* MessagePipe::CreateLocalProxy(
28 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
30 MessagePipe* message_pipe = new MessagePipe();
31 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33 message_pipe->endpoints_[1].reset(
34 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
39 MessagePipe* MessagePipe::CreateProxyLocal(
40 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41 DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
42 MessagePipe* message_pipe = new MessagePipe();
43 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44 message_pipe->endpoints_[0].reset(
45 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
51 unsigned MessagePipe::GetPeerPort(unsigned port) {
52 DCHECK(port == 0 || port == 1);
56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57 DCHECK(port == 0 || port == 1);
58 base::AutoLock locker(lock_);
59 DCHECK(endpoints_[port]);
61 return endpoints_[port]->GetType();
64 void MessagePipe::CancelAllWaiters(unsigned port) {
65 DCHECK(port == 0 || port == 1);
67 base::AutoLock locker(lock_);
68 DCHECK(endpoints_[port]);
69 endpoints_[port]->CancelAllWaiters();
72 void MessagePipe::Close(unsigned port) {
73 DCHECK(port == 0 || port == 1);
75 unsigned destination_port = GetPeerPort(port);
77 base::AutoLock locker(lock_);
78 // The endpoint's |OnPeerClose()| may have been called first and returned
79 // false, which would have resulted in its destruction.
80 if (!endpoints_[port])
83 endpoints_[port]->Close();
84 if (endpoints_[destination_port]) {
85 if (!endpoints_[destination_port]->OnPeerClose())
86 endpoints_[destination_port].reset();
88 endpoints_[port].reset();
91 // TODO(vtl): Handle flags.
92 MojoResult MessagePipe::WriteMessage(
94 UserPointer<const void> bytes,
96 std::vector<DispatcherTransport>* transports,
97 MojoWriteMessageFlags flags) {
98 DCHECK(port == 0 || port == 1);
99 return EnqueueMessageInternal(
101 make_scoped_ptr(new MessageInTransit(
102 MessageInTransit::kTypeMessagePipeEndpoint,
103 MessageInTransit::kSubtypeMessagePipeEndpointData,
109 MojoResult MessagePipe::ReadMessage(unsigned port,
110 UserPointer<void> bytes,
111 UserPointer<uint32_t> num_bytes,
112 DispatcherVector* dispatchers,
113 uint32_t* num_dispatchers,
114 MojoReadMessageFlags flags) {
115 DCHECK(port == 0 || port == 1);
117 base::AutoLock locker(lock_);
118 DCHECK(endpoints_[port]);
120 return endpoints_[port]->ReadMessage(
121 bytes, num_bytes, dispatchers, num_dispatchers, flags);
124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
125 DCHECK(port == 0 || port == 1);
127 base::AutoLock locker(const_cast<base::Lock&>(lock_));
128 DCHECK(endpoints_[port]);
130 return endpoints_[port]->GetHandleSignalsState();
133 MojoResult MessagePipe::AddWaiter(unsigned port,
135 MojoHandleSignals signals,
137 HandleSignalsState* signals_state) {
138 DCHECK(port == 0 || port == 1);
140 base::AutoLock locker(lock_);
141 DCHECK(endpoints_[port]);
143 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
146 void MessagePipe::RemoveWaiter(unsigned port,
148 HandleSignalsState* signals_state) {
149 DCHECK(port == 0 || port == 1);
151 base::AutoLock locker(lock_);
152 DCHECK(endpoints_[port]);
154 endpoints_[port]->RemoveWaiter(waiter, signals_state);
157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
158 DCHECK(port == 0 || port == 1);
160 base::AutoLock locker(lock_);
161 DCHECK(endpoints_[port]);
162 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
164 // The local peer is already closed, so just make a |ChannelEndpoint| that'll
165 // send the already-queued messages.
166 if (!endpoints_[GetPeerPort(port)]) {
167 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
170 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
172 endpoints_[port]->Close();
173 endpoints_[port].reset();
174 return channel_endpoint;
177 // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
178 // |MessagePipe| with two proxy endpoints, which will then act as a proxy
179 // (rather than trying to connect the two ends directly).
181 endpoints_[GetPeerPort(port)]->GetType() !=
182 MessagePipeEndpoint::kTypeLocal)
183 << "Direct message pipe passing across multiple channels not yet "
184 "implemented; will proxy";
186 scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
187 scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
190 static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
192 endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
193 old_endpoint->Close();
195 return channel_endpoint;
198 MojoResult MessagePipe::EnqueueMessage(unsigned port,
199 scoped_ptr<MessageInTransit> message) {
200 return EnqueueMessageInternal(port, message.Pass(), nullptr);
203 MessagePipe::MessagePipe() {
206 MessagePipe::~MessagePipe() {
207 // Owned by the dispatchers. The owning dispatchers should only release us via
208 // their |Close()| method, which should inform us of being closed via our
209 // |Close()|. Thus these should already be null.
210 DCHECK(!endpoints_[0]);
211 DCHECK(!endpoints_[1]);
214 MojoResult MessagePipe::EnqueueMessageInternal(
216 scoped_ptr<MessageInTransit> message,
217 std::vector<DispatcherTransport>* transports) {
218 DCHECK(port == 0 || port == 1);
221 if (message->type() == MessageInTransit::kTypeMessagePipe) {
223 return HandleControlMessage(port, message.Pass());
226 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
228 base::AutoLock locker(lock_);
229 DCHECK(endpoints_[GetPeerPort(port)]);
231 // The destination port need not be open, unlike the source port.
232 if (!endpoints_[port])
233 return MOJO_RESULT_FAILED_PRECONDITION;
236 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
237 if (result != MOJO_RESULT_OK)
241 // The endpoint's |EnqueueMessage()| may not report failure.
242 endpoints_[port]->EnqueueMessage(message.Pass());
243 return MOJO_RESULT_OK;
246 MojoResult MessagePipe::AttachTransportsNoLock(
248 MessageInTransit* message,
249 std::vector<DispatcherTransport>* transports) {
250 DCHECK(!message->has_dispatchers());
252 // You're not allowed to send either handle to a message pipe over the message
253 // pipe, so check for this. (The case of trying to write a handle to itself is
254 // taken care of by |Core|. That case kind of makes sense, but leads to
255 // complications if, e.g., both sides try to do the same thing with their
256 // respective handles simultaneously. The other case, of trying to write the
257 // peer handle to a handle, doesn't make sense -- since no handle will be
258 // available to read the message from.)
259 for (size_t i = 0; i < transports->size(); i++) {
260 if (!(*transports)[i].is_valid())
262 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
263 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
264 if (mp_transport.GetMessagePipe() == this) {
265 // The other case should have been disallowed by |Core|. (Note: |port|
266 // is the peer port of the handle given to |WriteMessage()|.)
267 DCHECK_EQ(mp_transport.GetPort(), port);
268 return MOJO_RESULT_INVALID_ARGUMENT;
273 // Clone the dispatchers and attach them to the message. (This must be done as
274 // a separate loop, since we want to leave the dispatchers alone on failure.)
275 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
276 dispatchers->reserve(transports->size());
277 for (size_t i = 0; i < transports->size(); i++) {
278 if ((*transports)[i].is_valid()) {
279 dispatchers->push_back(
280 (*transports)[i].CreateEquivalentDispatcherAndClose());
282 LOG(WARNING) << "Enqueueing null dispatcher";
283 dispatchers->push_back(scoped_refptr<Dispatcher>());
286 message->SetDispatchers(dispatchers.Pass());
287 return MOJO_RESULT_OK;
290 MojoResult MessagePipe::HandleControlMessage(
292 scoped_ptr<MessageInTransit> message) {
293 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
294 << message->subtype();
295 return MOJO_RESULT_UNKNOWN;
298 } // namespace system