Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / mojo / system / message_pipe.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/message_pipe.h"
6
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
14
15 namespace mojo {
16 namespace system {
17
18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
19                          scoped_ptr<MessagePipeEndpoint> endpoint1) {
20   endpoints_[0].reset(endpoint0.release());
21   endpoints_[1].reset(endpoint1.release());
22 }
23
24 MessagePipe::MessagePipe() {
25   endpoints_[0].reset(new LocalMessagePipeEndpoint());
26   endpoints_[1].reset(new LocalMessagePipeEndpoint());
27 }
28
29 // static
30 unsigned MessagePipe::GetPeerPort(unsigned port) {
31   DCHECK(port == 0 || port == 1);
32   return port ^ 1;
33 }
34
35 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
36   DCHECK(port == 0 || port == 1);
37   base::AutoLock locker(lock_);
38   DCHECK(endpoints_[port]);
39
40   return endpoints_[port]->GetType();
41 }
42
43 void MessagePipe::CancelAllWaiters(unsigned port) {
44   DCHECK(port == 0 || port == 1);
45
46   base::AutoLock locker(lock_);
47   DCHECK(endpoints_[port]);
48   endpoints_[port]->CancelAllWaiters();
49 }
50
51 void MessagePipe::Close(unsigned port) {
52   DCHECK(port == 0 || port == 1);
53
54   unsigned destination_port = GetPeerPort(port);
55
56   base::AutoLock locker(lock_);
57   DCHECK(endpoints_[port]);
58
59   endpoints_[port]->Close();
60   if (endpoints_[destination_port]) {
61     if (!endpoints_[destination_port]->OnPeerClose())
62       endpoints_[destination_port].reset();
63   }
64   endpoints_[port].reset();
65 }
66
67 // TODO(vtl): Handle flags.
68 MojoResult MessagePipe::WriteMessage(
69     unsigned port,
70     UserPointer<const void> bytes,
71     uint32_t num_bytes,
72     std::vector<DispatcherTransport>* transports,
73     MojoWriteMessageFlags flags) {
74   DCHECK(port == 0 || port == 1);
75   return EnqueueMessageInternal(
76       GetPeerPort(port),
77       make_scoped_ptr(new MessageInTransit(
78           MessageInTransit::kTypeMessagePipeEndpoint,
79           MessageInTransit::kSubtypeMessagePipeEndpointData,
80           num_bytes,
81           bytes)),
82       transports);
83 }
84
85 MojoResult MessagePipe::ReadMessage(unsigned port,
86                                     UserPointer<void> bytes,
87                                     UserPointer<uint32_t> num_bytes,
88                                     DispatcherVector* dispatchers,
89                                     uint32_t* num_dispatchers,
90                                     MojoReadMessageFlags flags) {
91   DCHECK(port == 0 || port == 1);
92
93   base::AutoLock locker(lock_);
94   DCHECK(endpoints_[port]);
95
96   return endpoints_[port]->ReadMessage(
97       bytes, num_bytes, dispatchers, num_dispatchers, flags);
98 }
99
100 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
101   DCHECK(port == 0 || port == 1);
102
103   base::AutoLock locker(const_cast<base::Lock&>(lock_));
104   DCHECK(endpoints_[port]);
105
106   return endpoints_[port]->GetHandleSignalsState();
107 }
108
109 MojoResult MessagePipe::AddWaiter(unsigned port,
110                                   Waiter* waiter,
111                                   MojoHandleSignals signals,
112                                   uint32_t context,
113                                   HandleSignalsState* signals_state) {
114   DCHECK(port == 0 || port == 1);
115
116   base::AutoLock locker(lock_);
117   DCHECK(endpoints_[port]);
118
119   return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
120 }
121
122 void MessagePipe::RemoveWaiter(unsigned port,
123                                Waiter* waiter,
124                                HandleSignalsState* signals_state) {
125   DCHECK(port == 0 || port == 1);
126
127   base::AutoLock locker(lock_);
128   DCHECK(endpoints_[port]);
129
130   endpoints_[port]->RemoveWaiter(waiter, signals_state);
131 }
132
133 void MessagePipe::ConvertLocalToProxy(unsigned port) {
134   DCHECK(port == 0 || port == 1);
135
136   base::AutoLock locker(lock_);
137   DCHECK(endpoints_[port]);
138   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
139
140   bool is_peer_open = !!endpoints_[GetPeerPort(port)];
141
142   // TODO(vtl): Hopefully this will work if the peer has been closed and when
143   // the peer is local. If the peer is remote, we should do something more
144   // sophisticated.
145   DCHECK(!is_peer_open ||
146          endpoints_[GetPeerPort(port)]->GetType() ==
147              MessagePipeEndpoint::kTypeLocal);
148
149   scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
150       new ProxyMessagePipeEndpoint(
151           static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
152           is_peer_open));
153   endpoints_[port].swap(replacement_endpoint);
154 }
155
156 MojoResult MessagePipe::EnqueueMessage(unsigned port,
157                                        scoped_ptr<MessageInTransit> message) {
158   return EnqueueMessageInternal(port, message.Pass(), NULL);
159 }
160
161 bool MessagePipe::Attach(unsigned port,
162                          scoped_refptr<Channel> channel,
163                          MessageInTransit::EndpointId local_id) {
164   DCHECK(port == 0 || port == 1);
165   DCHECK(channel);
166   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
167
168   base::AutoLock locker(lock_);
169   if (!endpoints_[port])
170     return false;
171
172   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
173   endpoints_[port]->Attach(channel, local_id);
174   return true;
175 }
176
177 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
178   DCHECK(port == 0 || port == 1);
179   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
180
181   base::AutoLock locker(lock_);
182   DCHECK(endpoints_[port]);
183   if (!endpoints_[port]->Run(remote_id))
184     endpoints_[port].reset();
185 }
186
187 void MessagePipe::OnRemove(unsigned port) {
188   unsigned destination_port = GetPeerPort(port);
189
190   base::AutoLock locker(lock_);
191   // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
192   if (!endpoints_[port])
193     return;
194
195   endpoints_[port]->OnRemove();
196   if (endpoints_[destination_port]) {
197     if (!endpoints_[destination_port]->OnPeerClose())
198       endpoints_[destination_port].reset();
199   }
200   endpoints_[port].reset();
201 }
202
203 MessagePipe::~MessagePipe() {
204   // Owned by the dispatchers. The owning dispatchers should only release us via
205   // their |Close()| method, which should inform us of being closed via our
206   // |Close()|. Thus these should already be null.
207   DCHECK(!endpoints_[0]);
208   DCHECK(!endpoints_[1]);
209 }
210
211 MojoResult MessagePipe::EnqueueMessageInternal(
212     unsigned port,
213     scoped_ptr<MessageInTransit> message,
214     std::vector<DispatcherTransport>* transports) {
215   DCHECK(port == 0 || port == 1);
216   DCHECK(message);
217
218   if (message->type() == MessageInTransit::kTypeMessagePipe) {
219     DCHECK(!transports);
220     return HandleControlMessage(port, message.Pass());
221   }
222
223   DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
224
225   base::AutoLock locker(lock_);
226   DCHECK(endpoints_[GetPeerPort(port)]);
227
228   // The destination port need not be open, unlike the source port.
229   if (!endpoints_[port])
230     return MOJO_RESULT_FAILED_PRECONDITION;
231
232   if (transports) {
233     MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
234     if (result != MOJO_RESULT_OK)
235       return result;
236   }
237
238   // The endpoint's |EnqueueMessage()| may not report failure.
239   endpoints_[port]->EnqueueMessage(message.Pass());
240   return MOJO_RESULT_OK;
241 }
242
243 MojoResult MessagePipe::AttachTransportsNoLock(
244     unsigned port,
245     MessageInTransit* message,
246     std::vector<DispatcherTransport>* transports) {
247   DCHECK(!message->has_dispatchers());
248
249   // You're not allowed to send either handle to a message pipe over the message
250   // pipe, so check for this. (The case of trying to write a handle to itself is
251   // taken care of by |Core|. That case kind of makes sense, but leads to
252   // complications if, e.g., both sides try to do the same thing with their
253   // respective handles simultaneously. The other case, of trying to write the
254   // peer handle to a handle, doesn't make sense -- since no handle will be
255   // available to read the message from.)
256   for (size_t i = 0; i < transports->size(); i++) {
257     if (!(*transports)[i].is_valid())
258       continue;
259     if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
260       MessagePipeDispatcherTransport mp_transport((*transports)[i]);
261       if (mp_transport.GetMessagePipe() == this) {
262         // The other case should have been disallowed by |Core|. (Note: |port|
263         // is the peer port of the handle given to |WriteMessage()|.)
264         DCHECK_EQ(mp_transport.GetPort(), port);
265         return MOJO_RESULT_INVALID_ARGUMENT;
266       }
267     }
268   }
269
270   // Clone the dispatchers and attach them to the message. (This must be done as
271   // a separate loop, since we want to leave the dispatchers alone on failure.)
272   scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
273   dispatchers->reserve(transports->size());
274   for (size_t i = 0; i < transports->size(); i++) {
275     if ((*transports)[i].is_valid()) {
276       dispatchers->push_back(
277           (*transports)[i].CreateEquivalentDispatcherAndClose());
278     } else {
279       LOG(WARNING) << "Enqueueing null dispatcher";
280       dispatchers->push_back(scoped_refptr<Dispatcher>());
281     }
282   }
283   message->SetDispatchers(dispatchers.Pass());
284   return MOJO_RESULT_OK;
285 }
286
287 MojoResult MessagePipe::HandleControlMessage(
288     unsigned /*port*/,
289     scoped_ptr<MessageInTransit> message) {
290   LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
291                << message->subtype();
292   return MOJO_RESULT_UNKNOWN;
293 }
294
295 }  // namespace system
296 }  // namespace mojo