1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/proxy_message_pipe_endpoint.h"
9 #include "base/containers/hash_tables.h"
10 #include "base/logging.h"
11 #include "base/stl_util.h"
12 #include "mojo/system/channel.h"
13 #include "mojo/system/message_pipe_dispatcher.h"
18 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
19 : local_id_(MessageInTransit::kInvalidEndpointId),
20 remote_id_(MessageInTransit::kInvalidEndpointId),
25 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
26 DCHECK(!is_running());
27 DCHECK(!is_attached());
28 AssertConsistentState();
29 DCHECK(paused_message_queue_.empty());
32 void ProxyMessagePipeEndpoint::Close() {
36 DCHECK(is_attached());
37 channel_->DetachMessagePipeEndpoint(local_id_);
39 local_id_ = MessageInTransit::kInvalidEndpointId;
40 remote_id_ = MessageInTransit::kInvalidEndpointId;
42 for (std::deque<MessageInTransit*>::iterator it =
43 paused_message_queue_.begin();
44 it != paused_message_queue_.end();
48 paused_message_queue_.clear();
51 void ProxyMessagePipeEndpoint::OnPeerClose() {
53 DCHECK(is_peer_open_);
55 is_peer_open_ = false;
56 MessageInTransit* message =
57 MessageInTransit::Create(MessageInTransit::kTypeMessagePipe,
58 MessageInTransit::kSubtypeMessagePipePeerClosed,
60 EnqueueMessageInternal(message);
63 MojoResult ProxyMessagePipeEndpoint::EnqueueMessage(
64 MessageInTransit* message,
65 std::vector<DispatcherTransport>* transports) {
66 DCHECK(!transports || !transports->empty());
69 AttachAndCloseDispatchers(message, transports);
71 EnqueueMessageInternal(message);
72 return MOJO_RESULT_OK;
75 void ProxyMessagePipeEndpoint::Attach(scoped_refptr<Channel> channel,
76 MessageInTransit::EndpointId local_id) {
77 DCHECK(channel.get());
78 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
80 DCHECK(!is_attached());
82 AssertConsistentState();
85 AssertConsistentState();
88 void ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id) {
89 // Assertions about arguments:
90 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
92 // Assertions about current state:
93 DCHECK(is_attached());
94 DCHECK(!is_running());
96 AssertConsistentState();
97 remote_id_ = remote_id;
98 AssertConsistentState();
100 for (std::deque<MessageInTransit*>::iterator it =
101 paused_message_queue_.begin(); it != paused_message_queue_.end();
103 EnqueueMessageInternal(*it);
104 paused_message_queue_.clear();
107 void ProxyMessagePipeEndpoint::AttachAndCloseDispatchers(
108 MessageInTransit* message,
109 std::vector<DispatcherTransport>* transports) {
111 DCHECK(!transports->empty());
114 LOG(ERROR) << "Sending handles over remote message pipes not yet supported "
115 "(closing sent handles)";
116 for (size_t i = 0; i < transports->size(); i++)
117 (*transports)[i].Close();
120 // Note: We may have to enqueue messages even when our (local) peer isn't open
121 // -- it may have been written to and closed immediately, before we were ready.
122 // This case is handled in |Run()| (which will call us).
123 void ProxyMessagePipeEndpoint::EnqueueMessageInternal(
124 MessageInTransit* message) {
128 message->set_source_id(local_id_);
129 message->set_destination_id(remote_id_);
130 // If it fails at this point, the message gets dropped. (This is no
131 // different from any other in-transit errors.)
132 // Note: |WriteMessage()| will destroy the message even on failure.
133 if (!channel_->WriteMessage(message))
134 LOG(WARNING) << "Failed to write message to channel";
136 paused_message_queue_.push_back(message);
141 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
143 DCHECK_NE(local_id_, MessageInTransit::kInvalidEndpointId);
144 } else { // Not attached.
145 DCHECK_EQ(local_id_, MessageInTransit::kInvalidEndpointId);
146 DCHECK_EQ(remote_id_, MessageInTransit::kInvalidEndpointId);
151 } // namespace system