Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / mojo / edk / system / message_pipe.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel_endpoint.h"
9 #include "mojo/edk/system/local_message_pipe_endpoint.h"
10 #include "mojo/edk/system/message_in_transit.h"
11 #include "mojo/edk/system/message_pipe_dispatcher.h"
12 #include "mojo/edk/system/message_pipe_endpoint.h"
13 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
14
15 namespace mojo {
16 namespace system {
17
18 // static
19 MessagePipe* MessagePipe::CreateLocalLocal() {
20   MessagePipe* message_pipe = new MessagePipe();
21   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
22   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
23   return message_pipe;
24 }
25
26 // static
27 MessagePipe* MessagePipe::CreateLocalProxy(
28     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
29   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
30   MessagePipe* message_pipe = new MessagePipe();
31   message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
32   *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
33   message_pipe->endpoints_[1].reset(
34       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
35   return message_pipe;
36 }
37
38 // static
39 MessagePipe* MessagePipe::CreateProxyLocal(
40     scoped_refptr<ChannelEndpoint>* channel_endpoint) {
41   DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
42   MessagePipe* message_pipe = new MessagePipe();
43   *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
44   message_pipe->endpoints_[0].reset(
45       new ProxyMessagePipeEndpoint(channel_endpoint->get()));
46   message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
47   return message_pipe;
48 }
49
50 // static
51 unsigned MessagePipe::GetPeerPort(unsigned port) {
52   DCHECK(port == 0 || port == 1);
53   return port ^ 1;
54 }
55
56 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
57   DCHECK(port == 0 || port == 1);
58   base::AutoLock locker(lock_);
59   DCHECK(endpoints_[port]);
60
61   return endpoints_[port]->GetType();
62 }
63
64 void MessagePipe::CancelAllWaiters(unsigned port) {
65   DCHECK(port == 0 || port == 1);
66
67   base::AutoLock locker(lock_);
68   DCHECK(endpoints_[port]);
69   endpoints_[port]->CancelAllWaiters();
70 }
71
72 void MessagePipe::Close(unsigned port) {
73   DCHECK(port == 0 || port == 1);
74
75   unsigned destination_port = GetPeerPort(port);
76
77   base::AutoLock locker(lock_);
78   // The endpoint's |OnPeerClose()| may have been called first and returned
79   // false, which would have resulted in its destruction.
80   if (!endpoints_[port])
81     return;
82
83   endpoints_[port]->Close();
84   if (endpoints_[destination_port]) {
85     if (!endpoints_[destination_port]->OnPeerClose())
86       endpoints_[destination_port].reset();
87   }
88   endpoints_[port].reset();
89 }
90
91 // TODO(vtl): Handle flags.
92 MojoResult MessagePipe::WriteMessage(
93     unsigned port,
94     UserPointer<const void> bytes,
95     uint32_t num_bytes,
96     std::vector<DispatcherTransport>* transports,
97     MojoWriteMessageFlags flags) {
98   DCHECK(port == 0 || port == 1);
99   return EnqueueMessageInternal(
100       GetPeerPort(port),
101       make_scoped_ptr(new MessageInTransit(
102           MessageInTransit::kTypeMessagePipeEndpoint,
103           MessageInTransit::kSubtypeMessagePipeEndpointData,
104           num_bytes,
105           bytes)),
106       transports);
107 }
108
109 MojoResult MessagePipe::ReadMessage(unsigned port,
110                                     UserPointer<void> bytes,
111                                     UserPointer<uint32_t> num_bytes,
112                                     DispatcherVector* dispatchers,
113                                     uint32_t* num_dispatchers,
114                                     MojoReadMessageFlags flags) {
115   DCHECK(port == 0 || port == 1);
116
117   base::AutoLock locker(lock_);
118   DCHECK(endpoints_[port]);
119
120   return endpoints_[port]->ReadMessage(
121       bytes, num_bytes, dispatchers, num_dispatchers, flags);
122 }
123
124 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
125   DCHECK(port == 0 || port == 1);
126
127   base::AutoLock locker(const_cast<base::Lock&>(lock_));
128   DCHECK(endpoints_[port]);
129
130   return endpoints_[port]->GetHandleSignalsState();
131 }
132
133 MojoResult MessagePipe::AddWaiter(unsigned port,
134                                   Waiter* waiter,
135                                   MojoHandleSignals signals,
136                                   uint32_t context,
137                                   HandleSignalsState* signals_state) {
138   DCHECK(port == 0 || port == 1);
139
140   base::AutoLock locker(lock_);
141   DCHECK(endpoints_[port]);
142
143   return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
144 }
145
146 void MessagePipe::RemoveWaiter(unsigned port,
147                                Waiter* waiter,
148                                HandleSignalsState* signals_state) {
149   DCHECK(port == 0 || port == 1);
150
151   base::AutoLock locker(lock_);
152   DCHECK(endpoints_[port]);
153
154   endpoints_[port]->RemoveWaiter(waiter, signals_state);
155 }
156
157 scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
158   DCHECK(port == 0 || port == 1);
159
160   base::AutoLock locker(lock_);
161   DCHECK(endpoints_[port]);
162   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
163
164   // The local peer is already closed, so just make a |ChannelEndpoint| that'll
165   // send the already-queued messages.
166   if (!endpoints_[GetPeerPort(port)]) {
167     scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
168         nullptr,
169         0,
170         static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
171             ->message_queue()));
172     endpoints_[port]->Close();
173     endpoints_[port].reset();
174     return channel_endpoint;
175   }
176
177   // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
178   // |MessagePipe| with two proxy endpoints, which will then act as a proxy
179   // (rather than trying to connect the two ends directly).
180   DLOG_IF(WARNING,
181           endpoints_[GetPeerPort(port)]->GetType() !=
182               MessagePipeEndpoint::kTypeLocal)
183       << "Direct message pipe passing across multiple channels not yet "
184          "implemented; will proxy";
185
186   scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
187   scoped_refptr<ChannelEndpoint> channel_endpoint(new ChannelEndpoint(
188       this,
189       port,
190       static_cast<LocalMessagePipeEndpoint*>(old_endpoint.get())
191           ->message_queue()));
192   endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
193   old_endpoint->Close();
194
195   return channel_endpoint;
196 }
197
198 MojoResult MessagePipe::EnqueueMessage(unsigned port,
199                                        scoped_ptr<MessageInTransit> message) {
200   return EnqueueMessageInternal(port, message.Pass(), nullptr);
201 }
202
203 MessagePipe::MessagePipe() {
204 }
205
206 MessagePipe::~MessagePipe() {
207   // Owned by the dispatchers. The owning dispatchers should only release us via
208   // their |Close()| method, which should inform us of being closed via our
209   // |Close()|. Thus these should already be null.
210   DCHECK(!endpoints_[0]);
211   DCHECK(!endpoints_[1]);
212 }
213
214 MojoResult MessagePipe::EnqueueMessageInternal(
215     unsigned port,
216     scoped_ptr<MessageInTransit> message,
217     std::vector<DispatcherTransport>* transports) {
218   DCHECK(port == 0 || port == 1);
219   DCHECK(message);
220
221   if (message->type() == MessageInTransit::kTypeMessagePipe) {
222     DCHECK(!transports);
223     return HandleControlMessage(port, message.Pass());
224   }
225
226   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
227
228   base::AutoLock locker(lock_);
229   DCHECK(endpoints_[GetPeerPort(port)]);
230
231   // The destination port need not be open, unlike the source port.
232   if (!endpoints_[port])
233     return MOJO_RESULT_FAILED_PRECONDITION;
234
235   if (transports) {
236     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
237     if (result != MOJO_RESULT_OK)
238       return result;
239   }
240
241   // The endpoint's |EnqueueMessage()| may not report failure.
242   endpoints_[port]->EnqueueMessage(message.Pass());
243   return MOJO_RESULT_OK;
244 }
245
246 MojoResult MessagePipe::AttachTransportsNoLock(
247     unsigned port,
248     MessageInTransit* message,
249     std::vector<DispatcherTransport>* transports) {
250   DCHECK(!message->has_dispatchers());
251
252   // You're not allowed to send either handle to a message pipe over the message
253   // pipe, so check for this. (The case of trying to write a handle to itself is
254   // taken care of by |Core|. That case kind of makes sense, but leads to
255   // complications if, e.g., both sides try to do the same thing with their
256   // respective handles simultaneously. The other case, of trying to write the
257   // peer handle to a handle, doesn't make sense -- since no handle will be
258   // available to read the message from.)
259   for (size_t i = 0; i < transports->size(); i++) {
260     if (!(*transports)[i].is_valid())
261       continue;
262     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
263       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
264       if (mp_transport.GetMessagePipe() == this) {
265         // The other case should have been disallowed by |Core|. (Note: |port|
266         // is the peer port of the handle given to |WriteMessage()|.)
267         DCHECK_EQ(mp_transport.GetPort(), port);
268         return MOJO_RESULT_INVALID_ARGUMENT;
269       }
270     }
271   }
272
273   // Clone the dispatchers and attach them to the message. (This must be done as
274   // a separate loop, since we want to leave the dispatchers alone on failure.)
275   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
276   dispatchers->reserve(transports->size());
277   for (size_t i = 0; i < transports->size(); i++) {
278     if ((*transports)[i].is_valid()) {
279       dispatchers->push_back(
280           (*transports)[i].CreateEquivalentDispatcherAndClose());
281     } else {
282       LOG(WARNING) << "Enqueueing null dispatcher";
283       dispatchers->push_back(scoped_refptr<Dispatcher>());
284     }
285   }
286   message->SetDispatchers(dispatchers.Pass());
287   return MOJO_RESULT_OK;
288 }
289
290 MojoResult MessagePipe::HandleControlMessage(
291     unsigned /*port*/,
292     scoped_ptr<MessageInTransit> message) {
293   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
294                << message->subtype();
295   return MOJO_RESULT_UNKNOWN;
296 }
297
298 }  // namespace system
299 }  // namespace mojo