1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
19 scoped_ptr<MessagePipeEndpoint> endpoint1) {
20 endpoints_[0].reset(endpoint0.release());
21 endpoints_[1].reset(endpoint1.release());
24 MessagePipe::MessagePipe() {
25 endpoints_[0].reset(new LocalMessagePipeEndpoint());
26 endpoints_[1].reset(new LocalMessagePipeEndpoint());
30 unsigned MessagePipe::GetPeerPort(unsigned port) {
31 DCHECK(port == 0 || port == 1);
35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
36 DCHECK(port == 0 || port == 1);
37 base::AutoLock locker(lock_);
38 DCHECK(endpoints_[port]);
40 return endpoints_[port]->GetType();
43 void MessagePipe::CancelAllWaiters(unsigned port) {
44 DCHECK(port == 0 || port == 1);
46 base::AutoLock locker(lock_);
47 DCHECK(endpoints_[port]);
48 endpoints_[port]->CancelAllWaiters();
51 void MessagePipe::Close(unsigned port) {
52 DCHECK(port == 0 || port == 1);
54 unsigned destination_port = GetPeerPort(port);
56 base::AutoLock locker(lock_);
57 DCHECK(endpoints_[port]);
59 endpoints_[port]->Close();
60 if (endpoints_[destination_port]) {
61 if (!endpoints_[destination_port]->OnPeerClose())
62 endpoints_[destination_port].reset();
64 endpoints_[port].reset();
67 // TODO(vtl): Handle flags.
68 MojoResult MessagePipe::WriteMessage(
70 UserPointer<const void> bytes,
72 std::vector<DispatcherTransport>* transports,
73 MojoWriteMessageFlags flags) {
74 DCHECK(port == 0 || port == 1);
75 return EnqueueMessageInternal(
77 make_scoped_ptr(new MessageInTransit(
78 MessageInTransit::kTypeMessagePipeEndpoint,
79 MessageInTransit::kSubtypeMessagePipeEndpointData,
85 MojoResult MessagePipe::ReadMessage(unsigned port,
86 UserPointer<void> bytes,
87 UserPointer<uint32_t> num_bytes,
88 DispatcherVector* dispatchers,
89 uint32_t* num_dispatchers,
90 MojoReadMessageFlags flags) {
91 DCHECK(port == 0 || port == 1);
93 base::AutoLock locker(lock_);
94 DCHECK(endpoints_[port]);
96 return endpoints_[port]->ReadMessage(
97 bytes, num_bytes, dispatchers, num_dispatchers, flags);
100 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
101 DCHECK(port == 0 || port == 1);
103 base::AutoLock locker(const_cast<base::Lock&>(lock_));
104 DCHECK(endpoints_[port]);
106 return endpoints_[port]->GetHandleSignalsState();
109 MojoResult MessagePipe::AddWaiter(unsigned port,
111 MojoHandleSignals signals,
113 HandleSignalsState* signals_state) {
114 DCHECK(port == 0 || port == 1);
116 base::AutoLock locker(lock_);
117 DCHECK(endpoints_[port]);
119 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
122 void MessagePipe::RemoveWaiter(unsigned port,
124 HandleSignalsState* signals_state) {
125 DCHECK(port == 0 || port == 1);
127 base::AutoLock locker(lock_);
128 DCHECK(endpoints_[port]);
130 endpoints_[port]->RemoveWaiter(waiter, signals_state);
133 void MessagePipe::ConvertLocalToProxy(unsigned port) {
134 DCHECK(port == 0 || port == 1);
136 base::AutoLock locker(lock_);
137 DCHECK(endpoints_[port]);
138 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
140 bool is_peer_open = !!endpoints_[GetPeerPort(port)];
142 // TODO(vtl): Hopefully this will work if the peer has been closed and when
143 // the peer is local. If the peer is remote, we should do something more
145 DCHECK(!is_peer_open ||
146 endpoints_[GetPeerPort(port)]->GetType() ==
147 MessagePipeEndpoint::kTypeLocal);
149 scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
150 new ProxyMessagePipeEndpoint(
151 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
153 endpoints_[port].swap(replacement_endpoint);
156 MojoResult MessagePipe::EnqueueMessage(unsigned port,
157 scoped_ptr<MessageInTransit> message) {
158 return EnqueueMessageInternal(port, message.Pass(), NULL);
161 bool MessagePipe::Attach(unsigned port,
162 scoped_refptr<Channel> channel,
163 MessageInTransit::EndpointId local_id) {
164 DCHECK(port == 0 || port == 1);
166 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
168 base::AutoLock locker(lock_);
169 if (!endpoints_[port])
172 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
173 endpoints_[port]->Attach(channel, local_id);
177 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
178 DCHECK(port == 0 || port == 1);
179 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
181 base::AutoLock locker(lock_);
182 DCHECK(endpoints_[port]);
183 if (!endpoints_[port]->Run(remote_id))
184 endpoints_[port].reset();
187 void MessagePipe::OnRemove(unsigned port) {
188 unsigned destination_port = GetPeerPort(port);
190 base::AutoLock locker(lock_);
191 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
192 if (!endpoints_[port])
195 endpoints_[port]->OnRemove();
196 if (endpoints_[destination_port]) {
197 if (!endpoints_[destination_port]->OnPeerClose())
198 endpoints_[destination_port].reset();
200 endpoints_[port].reset();
203 MessagePipe::~MessagePipe() {
204 // Owned by the dispatchers. The owning dispatchers should only release us via
205 // their |Close()| method, which should inform us of being closed via our
206 // |Close()|. Thus these should already be null.
207 DCHECK(!endpoints_[0]);
208 DCHECK(!endpoints_[1]);
211 MojoResult MessagePipe::EnqueueMessageInternal(
213 scoped_ptr<MessageInTransit> message,
214 std::vector<DispatcherTransport>* transports) {
215 DCHECK(port == 0 || port == 1);
218 if (message->type() == MessageInTransit::kTypeMessagePipe) {
220 return HandleControlMessage(port, message.Pass());
223 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
225 base::AutoLock locker(lock_);
226 DCHECK(endpoints_[GetPeerPort(port)]);
228 // The destination port need not be open, unlike the source port.
229 if (!endpoints_[port])
230 return MOJO_RESULT_FAILED_PRECONDITION;
233 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
234 if (result != MOJO_RESULT_OK)
238 // The endpoint's |EnqueueMessage()| may not report failure.
239 endpoints_[port]->EnqueueMessage(message.Pass());
240 return MOJO_RESULT_OK;
243 MojoResult MessagePipe::AttachTransportsNoLock(
245 MessageInTransit* message,
246 std::vector<DispatcherTransport>* transports) {
247 DCHECK(!message->has_dispatchers());
249 // You're not allowed to send either handle to a message pipe over the message
250 // pipe, so check for this. (The case of trying to write a handle to itself is
251 // taken care of by |Core|. That case kind of makes sense, but leads to
252 // complications if, e.g., both sides try to do the same thing with their
253 // respective handles simultaneously. The other case, of trying to write the
254 // peer handle to a handle, doesn't make sense -- since no handle will be
255 // available to read the message from.)
256 for (size_t i = 0; i < transports->size(); i++) {
257 if (!(*transports)[i].is_valid())
259 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
260 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
261 if (mp_transport.GetMessagePipe() == this) {
262 // The other case should have been disallowed by |Core|. (Note: |port|
263 // is the peer port of the handle given to |WriteMessage()|.)
264 DCHECK_EQ(mp_transport.GetPort(), port);
265 return MOJO_RESULT_INVALID_ARGUMENT;
270 // Clone the dispatchers and attach them to the message. (This must be done as
271 // a separate loop, since we want to leave the dispatchers alone on failure.)
272 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
273 dispatchers->reserve(transports->size());
274 for (size_t i = 0; i < transports->size(); i++) {
275 if ((*transports)[i].is_valid()) {
276 dispatchers->push_back(
277 (*transports)[i].CreateEquivalentDispatcherAndClose());
279 LOG(WARNING) << "Enqueueing null dispatcher";
280 dispatchers->push_back(scoped_refptr<Dispatcher>());
283 message->SetDispatchers(dispatchers.Pass());
284 return MOJO_RESULT_OK;
287 MojoResult MessagePipe::HandleControlMessage(
289 scoped_ptr<MessageInTransit> message) {
290 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
291 << message->subtype();
292 return MOJO_RESULT_UNKNOWN;
295 } // namespace system