Upstream version 5.34.104.0
[platform/framework/web/crosswalk.git] / src / mojo / system / proxy_message_pipe_endpoint.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/system/proxy_message_pipe_endpoint.h"
6
7 #include <string.h>
8
9 #include "base/containers/hash_tables.h"
10 #include "base/logging.h"
11 #include "base/stl_util.h"
12 #include "mojo/system/channel.h"
13 #include "mojo/system/message_pipe_dispatcher.h"
14
15 namespace mojo {
16 namespace system {
17
18 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
19     : local_id_(MessageInTransit::kInvalidEndpointId),
20       remote_id_(MessageInTransit::kInvalidEndpointId),
21       is_open_(true),
22       is_peer_open_(true) {
23 }
24
25 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
26   DCHECK(!is_running());
27   DCHECK(!is_attached());
28   AssertConsistentState();
29   DCHECK(paused_message_queue_.empty());
30 }
31
32 void ProxyMessagePipeEndpoint::Close() {
33   DCHECK(is_open_);
34   is_open_ = false;
35
36   DCHECK(is_attached());
37   channel_->DetachMessagePipeEndpoint(local_id_);
38   channel_ = NULL;
39   local_id_ = MessageInTransit::kInvalidEndpointId;
40   remote_id_ = MessageInTransit::kInvalidEndpointId;
41
42   for (std::deque<MessageInTransit*>::iterator it =
43            paused_message_queue_.begin();
44        it != paused_message_queue_.end();
45        ++it) {
46     (*it)->Destroy();
47   }
48   paused_message_queue_.clear();
49 }
50
51 void ProxyMessagePipeEndpoint::OnPeerClose() {
52   DCHECK(is_open_);
53   DCHECK(is_peer_open_);
54
55   is_peer_open_ = false;
56   MessageInTransit* message =
57       MessageInTransit::Create(MessageInTransit::kTypeMessagePipe,
58                                MessageInTransit::kSubtypeMessagePipePeerClosed,
59                                NULL, 0, 0);
60   EnqueueMessageInternal(message);
61 }
62
63 MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(
64     MessageInTransit* message,
65     std::vector<DispatcherTransport>* transports) {
66   DCHECK(!transports || !transports->empty());
67
68   if (transports)
69     AttachAndCloseDispatchers(message, transports);
70
71   EnqueueMessageInternal(message);
72   return MOJO_RESULT_OK;
73 }
74
75 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
76                                       MessageInTransit::EndpointId local_id) {
77   DCHECK(channel.get());
78   DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
79
80   DCHECK(!is_attached());
81
82   AssertConsistentState();
83   channel_ = channel;
84   local_id_ = local_id;
85   AssertConsistentState();
86 }
87
88 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
89   // Assertions about arguments:
90   DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
91
92   // Assertions about current state:
93   DCHECK(is_attached());
94   DCHECK(!is_running());
95
96   AssertConsistentState();
97   remote_id_ = remote_id;
98   AssertConsistentState();
99
100   for (std::deque<MessageInTransit*>::iterator it =
101            paused_message_queue_.begin(); it != paused_message_queue_.end();
102        ++it)
103     EnqueueMessageInternal(*it);
104   paused_message_queue_.clear();
105 }
106
107 void ProxyMessagePipeEndpoint::AttachAndCloseDispatchers(
108     MessageInTransit* message,
109     std::vector<DispatcherTransport>* transports) {
110   DCHECK(transports);
111   DCHECK(!transports->empty());
112
113   // TODO(vtl)
114   LOG(ERROR) << "Sending handles over remote message pipes not yet supported "
115                 "(closing sent handles)";
116   for (size_t i = 0; i < transports->size(); i++)
117     (*transports)[i].Close();
118 }
119
120 // Note: We may have to enqueue messages even when our (local) peer isn't open
121 // -- it may have been written to and closed immediately, before we were ready.
122 // This case is handled in |Run()| (which will call us).
123 void ProxyMessagePipeEndpoint::EnqueueMessageInternal(
124     MessageInTransit* message) {
125   DCHECK(is_open_);
126
127   if (is_running()) {
128     message->set_source_id(local_id_);
129     message->set_destination_id(remote_id_);
130     // If it fails at this point, the message gets dropped. (This is no
131     // different from any other in-transit errors.)
132     // Note: |WriteMessage()| will destroy the message even on failure.
133     if (!channel_->WriteMessage(message))
134       LOG(WARNING) << "Failed to write message to channel";
135   } else {
136     paused_message_queue_.push_back(message);
137   }
138 }
139
140 #ifndef NDEBUG
141 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
142   if (is_attached()) {
143     DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
144   } else {  // Not attached.
145     DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
146     DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
147   }
148 }
149 #endif
150
151 }  // namespace system
152 }  // namespace mojo