a311695b23311467ca94da0d11dea0b717f32da2
[platform/framework/web/crosswalk.git] / src / mojo / system / channel.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/channel.h"
6
7 #include <algorithm>
8
9 #include "base/basictypes.h"
10 #include "base/bind.h"
11 #include "base/compiler_specific.h"
12 #include "base/logging.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
17
18 namespace mojo {
19 namespace system {
20
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22                    MessageInTransit::kInvalidEndpointId,
23                kBootstrapEndpointId_is_invalid);
24
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26     Channel::kBootstrapEndpointId;
27
28 Channel::EndpointInfo::EndpointInfo()
29     : state(STATE_NORMAL),
30       port() {
31 }
32
33 Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
34                                     unsigned port)
35     : state(STATE_NORMAL),
36       message_pipe(message_pipe),
37       port(port) {
38 }
39
40 Channel::EndpointInfo::~EndpointInfo() {
41 }
42
43 Channel::Channel()
44     : is_running_(false),
45       next_local_id_(kBootstrapEndpointId) {
46 }
47
48 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
49   DCHECK(creation_thread_checker_.CalledOnValidThread());
50   DCHECK(raw_channel);
51
52   // No need to take |lock_|, since this must be called before this object
53   // becomes thread-safe.
54   DCHECK(!is_running_no_lock());
55   raw_channel_ = raw_channel.Pass();
56
57   if (!raw_channel_->Init(this)) {
58     raw_channel_.reset();
59     return false;
60   }
61
62   is_running_ = true;
63   return true;
64 }
65
66 void Channel::Shutdown() {
67   DCHECK(creation_thread_checker_.CalledOnValidThread());
68
69   IdToEndpointInfoMap to_destroy;
70   {
71     base::AutoLock locker(lock_);
72     if (!is_running_no_lock())
73       return;
74
75     // Note: Don't reset |raw_channel_|, in case we're being called from within
76     // |OnReadMessage()| or |OnFatalError()|.
77     raw_channel_->Shutdown();
78     is_running_ = false;
79
80     // We need to deal with it outside the lock.
81     std::swap(to_destroy, local_id_to_endpoint_info_map_);
82   }
83
84   size_t num_live = 0;
85   size_t num_zombies = 0;
86   for (IdToEndpointInfoMap::iterator it = to_destroy.begin();
87        it != to_destroy.end();
88        ++it) {
89     if (it->second.state == EndpointInfo::STATE_NORMAL) {
90       it->second.message_pipe->OnRemove(it->second.port);
91       num_live++;
92     } else {
93       DCHECK(!it->second.message_pipe);
94       num_zombies++;
95     }
96   }
97   DVLOG_IF(2, num_live || num_zombies)
98       << "Shut down Channel with " << num_live << " live endpoints and "
99       << num_zombies << " zombies";
100 }
101
102 MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
103     scoped_refptr<MessagePipe> message_pipe,
104     unsigned port) {
105   DCHECK(message_pipe);
106   DCHECK(port == 0 || port == 1);
107
108   MessageInTransit::EndpointId local_id;
109   {
110     base::AutoLock locker(lock_);
111
112     while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
113            local_id_to_endpoint_info_map_.find(next_local_id_) !=
114                local_id_to_endpoint_info_map_.end())
115       next_local_id_++;
116
117     local_id = next_local_id_;
118     next_local_id_++;
119
120     // TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
121     // some expensive reference count increment/decrements.) Once this is done,
122     // we should be able to delete |EndpointInfo|'s default constructor.
123     local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
124   }
125
126   // This might fail if that port got an |OnPeerClose()| before attaching.
127   if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
128     return local_id;
129
130   // Note: If it failed, quite possibly the endpoint info was removed from that
131   // map (there's a race between us adding it to the map above and calling
132   // |Attach()|). And even if an entry exists for |local_id|, we need to check
133   // that it's the one we added (and not some other one that was added since).
134   {
135     base::AutoLock locker(lock_);
136     IdToEndpointInfoMap::iterator it =
137         local_id_to_endpoint_info_map_.find(local_id);
138     if (it != local_id_to_endpoint_info_map_.end() &&
139         it->second.message_pipe.get() == message_pipe.get() &&
140         it->second.port == port) {
141       DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
142       // TODO(vtl): FIXME -- This is wrong. We need to specify (to
143       // |AttachMessagePipeEndpoint()| who's going to be responsible for calling
144       // |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
145       // |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
146       // run, then we'll get messages to an "invalid" local ID (for running, for
147       // removal).
148       local_id_to_endpoint_info_map_.erase(it);
149     }
150   }
151   return MessageInTransit::kInvalidEndpointId;
152 }
153
154 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
155                                      MessageInTransit::EndpointId remote_id) {
156   EndpointInfo endpoint_info;
157   {
158     base::AutoLock locker(lock_);
159
160     IdToEndpointInfoMap::const_iterator it =
161         local_id_to_endpoint_info_map_.find(local_id);
162     if (it == local_id_to_endpoint_info_map_.end())
163       return false;
164     endpoint_info = it->second;
165   }
166
167   // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
168   // and ignore it.
169   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
170     DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
171                 "(local ID " << local_id << ", remote ID " << remote_id << ")";
172     return true;
173   }
174
175   // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
176   // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
177   endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
178   return true;
179 }
180
181 void Channel::RunRemoteMessagePipeEndpoint(
182     MessageInTransit::EndpointId local_id,
183     MessageInTransit::EndpointId remote_id) {
184 #if DCHECK_IS_ON
185   {
186     base::AutoLock locker(lock_);
187     DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
188                local_id_to_endpoint_info_map_.end());
189   }
190 #endif
191
192   if (!SendControlMessage(
193            MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
194            local_id, remote_id)) {
195     HandleLocalError(base::StringPrintf(
196         "Failed to send message to run remote message pipe endpoint (local ID "
197         "%u, remote ID %u)",
198         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
199   }
200 }
201
202 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
203   base::AutoLock locker(lock_);
204   if (!is_running_no_lock()) {
205     // TODO(vtl): I think this is probably not an error condition, but I should
206     // think about it (and the shutdown sequence) more carefully.
207     LOG(WARNING) << "WriteMessage() after shutdown";
208     return false;
209   }
210
211   return raw_channel_->WriteMessage(message.Pass());
212 }
213
214 bool Channel::IsWriteBufferEmpty() {
215   base::AutoLock locker(lock_);
216   if (!is_running_no_lock())
217     return true;
218   return raw_channel_->IsWriteBufferEmpty();
219 }
220
221 void Channel::DetachMessagePipeEndpoint(
222     MessageInTransit::EndpointId local_id,
223     MessageInTransit::EndpointId remote_id) {
224   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
225
226   bool should_send_remove_message = false;
227   {
228     base::AutoLock locker_(lock_);
229     if (!is_running_no_lock())
230       return;
231
232     IdToEndpointInfoMap::iterator it =
233         local_id_to_endpoint_info_map_.find(local_id);
234     DCHECK(it != local_id_to_endpoint_info_map_.end());
235
236     switch (it->second.state) {
237       case EndpointInfo::STATE_NORMAL:
238         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
239         it->second.message_pipe = NULL;
240         should_send_remove_message =
241             (remote_id != MessageInTransit::kInvalidEndpointId);
242         break;
243       case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
244         local_id_to_endpoint_info_map_.erase(it);
245         break;
246       case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
247         NOTREACHED();
248         break;
249       case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
250         it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
251         break;
252     }
253   }
254   if (!should_send_remove_message)
255     return;
256
257   if (!SendControlMessage(
258            MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
259            local_id, remote_id)) {
260     HandleLocalError(base::StringPrintf(
261         "Failed to send message to remove remote message pipe endpoint (local "
262         "ID %u, remote ID %u)",
263         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
264   }
265 }
266
267 size_t Channel::GetSerializedPlatformHandleSize() const {
268   return raw_channel_->GetSerializedPlatformHandleSize();
269 }
270
271 Channel::~Channel() {
272   // The channel should have been shut down first.
273   DCHECK(!is_running_no_lock());
274 }
275
276 void Channel::OnReadMessage(
277     const MessageInTransit::View& message_view,
278     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
279   switch (message_view.type()) {
280     case MessageInTransit::kTypeMessagePipeEndpoint:
281     case MessageInTransit::kTypeMessagePipe:
282       OnReadMessageForDownstream(message_view, platform_handles.Pass());
283       break;
284     case MessageInTransit::kTypeChannel:
285       OnReadMessageForChannel(message_view, platform_handles.Pass());
286       break;
287     default:
288       HandleRemoteError(base::StringPrintf(
289           "Received message of invalid type %u",
290           static_cast<unsigned>(message_view.type())));
291       break;
292   }
293 }
294
295 void Channel::OnFatalError(FatalError fatal_error) {
296   switch (fatal_error) {
297     case FATAL_ERROR_READ:
298       // Most read errors aren't notable: they just reflect that the other side
299       // tore down the channel.
300       DVLOG(1) << "RawChannel fatal error (read)";
301       break;
302     case FATAL_ERROR_WRITE:
303       // Write errors are slightly notable: they probably shouldn't happen under
304       // normal operation (but maybe the other side crashed).
305       LOG(WARNING) << "RawChannel fatal error (write)";
306       break;
307   }
308   Shutdown();
309 }
310
311 void Channel::OnReadMessageForDownstream(
312     const MessageInTransit::View& message_view,
313     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
314   DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
315          message_view.type() == MessageInTransit::kTypeMessagePipe);
316
317   MessageInTransit::EndpointId local_id = message_view.destination_id();
318   if (local_id == MessageInTransit::kInvalidEndpointId) {
319     HandleRemoteError("Received message with no destination ID");
320     return;
321   }
322
323   EndpointInfo endpoint_info;
324   {
325     base::AutoLock locker(lock_);
326
327     // Since we own |raw_channel_|, and this method and |Shutdown()| should only
328     // be called from the creation thread, |raw_channel_| should never be null
329     // here.
330     DCHECK(is_running_no_lock());
331
332     IdToEndpointInfoMap::const_iterator it =
333         local_id_to_endpoint_info_map_.find(local_id);
334     if (it == local_id_to_endpoint_info_map_.end()) {
335       HandleRemoteError(base::StringPrintf(
336           "Received a message for nonexistent local destination ID %u",
337           static_cast<unsigned>(local_id)));
338       // This is strongly indicative of some problem. However, it's not a fatal
339       // error, since it may indicate a bug (or hostile) remote process. Don't
340       // die even for Debug builds, since handling this properly needs to be
341       // tested (TODO(vtl)).
342       DLOG(ERROR) << "This should not happen under normal operation.";
343       return;
344     }
345     endpoint_info = it->second;
346   }
347
348   // Ignore messages for zombie endpoints (not an error).
349   if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
350     DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
351              << local_id << ", remote ID = " << message_view.source_id() << ")";
352     return;
353   }
354
355   // We need to duplicate the message (data), because |EnqueueMessage()| will
356   // take ownership of it.
357   scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
358   if (message_view.transport_data_buffer_size() > 0) {
359     DCHECK(message_view.transport_data_buffer());
360     message->SetDispatchers(
361         TransportData::DeserializeDispatchers(
362             message_view.transport_data_buffer(),
363             message_view.transport_data_buffer_size(),
364             platform_handles.Pass(),
365             this));
366   }
367   MojoResult result = endpoint_info.message_pipe->EnqueueMessage(
368       MessagePipe::GetPeerPort(endpoint_info.port), message.Pass());
369   if (result != MOJO_RESULT_OK) {
370     // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
371     // has been closed (in an unavoidable race). This might also be a "remote"
372     // error, e.g., if the remote side is sending invalid control messages (to
373     // the message pipe).
374     HandleLocalError(base::StringPrintf(
375         "Failed to enqueue message to local ID %u (result %d)",
376         static_cast<unsigned>(local_id), static_cast<int>(result)));
377     return;
378   }
379 }
380
381 void Channel::OnReadMessageForChannel(
382     const MessageInTransit::View& message_view,
383     embedder::ScopedPlatformHandleVectorPtr platform_handles) {
384   DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
385
386   // Currently, no channel messages take platform handles.
387   if (platform_handles) {
388     HandleRemoteError(
389         "Received invalid channel message (has platform handles)");
390     NOTREACHED();
391     return;
392   }
393
394   switch (message_view.subtype()) {
395     case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
396       DVLOG(2) << "Handling channel message to run message pipe (local ID "
397                << message_view.destination_id() << ", remote ID "
398                << message_view.source_id() << ")";
399       if (!RunMessagePipeEndpoint(message_view.destination_id(),
400                                   message_view.source_id())) {
401         HandleRemoteError(
402             "Received invalid channel message to run message pipe");
403       }
404       break;
405     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
406       DVLOG(2) << "Handling channel message to remove message pipe (local ID "
407                << message_view.destination_id() << ", remote ID "
408                << message_view.source_id() << ")";
409       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
410                                      message_view.source_id())) {
411         HandleRemoteError(
412             "Received invalid channel message to remove message pipe");
413       }
414       break;
415     case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
416       DVLOG(2) << "Handling channel message to ack remove message pipe (local "
417                   "ID "
418                << message_view.destination_id() << ", remote ID "
419                << message_view.source_id() << ")";
420       if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
421                                      message_view.source_id())) {
422         HandleRemoteError(
423             "Received invalid channel message to ack remove message pipe");
424       }
425       break;
426     default:
427       HandleRemoteError("Received invalid channel message");
428       NOTREACHED();
429       break;
430   }
431 }
432
433 bool Channel::RemoveMessagePipeEndpoint(
434     MessageInTransit::EndpointId local_id,
435     MessageInTransit::EndpointId remote_id) {
436   EndpointInfo endpoint_info;
437   {
438     base::AutoLock locker(lock_);
439
440     IdToEndpointInfoMap::iterator it =
441         local_id_to_endpoint_info_map_.find(local_id);
442     if (it == local_id_to_endpoint_info_map_.end()) {
443       DVLOG(2) << "Remove message pipe error: not found";
444       return false;
445     }
446
447     // If it's waiting for the remove ack, just do it and return.
448     if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
449       local_id_to_endpoint_info_map_.erase(it);
450       return true;
451     }
452
453     if (it->second.state != EndpointInfo::STATE_NORMAL) {
454       DVLOG(2) << "Remove message pipe error: wrong state";
455       return false;
456     }
457
458     it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
459     endpoint_info = it->second;
460     it->second.message_pipe = NULL;
461   }
462
463   if (!SendControlMessage(
464            MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
465            local_id, remote_id)) {
466     HandleLocalError(base::StringPrintf(
467         "Failed to send message to remove remote message pipe endpoint ack "
468         "(local ID %u, remote ID %u)",
469         static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
470   }
471
472   endpoint_info.message_pipe->OnRemove(endpoint_info.port);
473
474   return true;
475 }
476
477 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
478                                  MessageInTransit::EndpointId local_id,
479                                  MessageInTransit::EndpointId remote_id) {
480   DVLOG(2) << "Sending channel control message: subtype " << subtype
481            << ", local ID " << local_id << ", remote ID " << remote_id;
482   scoped_ptr<MessageInTransit> message(new MessageInTransit(
483       MessageInTransit::kTypeChannel, subtype, 0, NULL));
484   message->set_source_id(local_id);
485   message->set_destination_id(remote_id);
486   return WriteMessage(message.Pass());
487 }
488
489 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
490   // TODO(vtl): Is this how we really want to handle this? Probably we want to
491   // terminate the connection, since it's spewing invalid stuff.
492   LOG(WARNING) << error_message;
493 }
494
495 void Channel::HandleLocalError(const base::StringPiece& error_message) {
496   // TODO(vtl): Is this how we really want to handle this?
497   // Sometimes we'll want to propagate the error back to the message pipe
498   // (endpoint), and notify it that the remote is (effectively) closed.
499   // Sometimes we'll want to kill the channel (and notify all the endpoints that
500   // their remotes are dead.
501   LOG(WARNING) << error_message;
502 }
503
504 }  // namespace system
505 }  // namespace mojo