1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/channel_endpoint.h"
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/message_pipe.h"
10 #include "mojo/edk/system/transport_data.h"
15 ChannelEndpoint::ChannelEndpoint(MessagePipe* message_pipe,
17 MessageInTransitQueue* message_queue)
18 : message_pipe_(message_pipe), port_(port), channel_(nullptr) {
19 DCHECK(message_pipe_.get() || message_queue);
20 DCHECK(port_ == 0 || port_ == 1);
23 paused_message_queue_.Swap(message_queue);
26 bool ChannelEndpoint::EnqueueMessage(scoped_ptr<MessageInTransit> message) {
29 base::AutoLock locker(lock_);
31 if (!channel_ || !remote_id_.is_valid()) {
32 // We may reach here if we haven't been attached or run yet.
33 // TODO(vtl): We may also reach here if the channel is shut down early for
34 // some reason (with live message pipes on it). We can't check |state_| yet,
35 // until it's protected under lock, but in this case we should return false
36 // (and not enqueue any messages).
37 paused_message_queue_.AddMessage(message.Pass());
41 // TODO(vtl): Currently, this only works in the "running" case.
42 DCHECK(remote_id_.is_valid());
44 return WriteMessageNoLock(message.Pass());
47 void ChannelEndpoint::DetachFromMessagePipe() {
49 base::AutoLock locker(lock_);
50 DCHECK(message_pipe_.get());
51 message_pipe_ = nullptr;
55 DCHECK(local_id_.is_valid());
56 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
58 channel_->DetachEndpoint(this, local_id_, remote_id_);
60 local_id_ = ChannelEndpointId();
61 remote_id_ = ChannelEndpointId();
65 void ChannelEndpoint::AttachAndRun(Channel* channel,
66 ChannelEndpointId local_id,
67 ChannelEndpointId remote_id) {
69 DCHECK(local_id.is_valid());
70 DCHECK(remote_id.is_valid());
72 base::AutoLock locker(lock_);
74 DCHECK(!local_id_.is_valid());
75 DCHECK(!remote_id_.is_valid());
78 remote_id_ = remote_id;
80 while (!paused_message_queue_.IsEmpty()) {
81 LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
82 << "Failed to write enqueue message to channel";
85 if (!message_pipe_.get()) {
86 channel_->DetachEndpoint(this, local_id_, remote_id_);
88 local_id_ = ChannelEndpointId();
89 remote_id_ = ChannelEndpointId();
93 bool ChannelEndpoint::OnReadMessage(
94 const MessageInTransit::View& message_view,
95 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
96 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
97 scoped_refptr<MessagePipe> message_pipe;
100 base::AutoLock locker(lock_);
102 if (!message_pipe_.get()) {
103 // This isn't a failure per se. (It just means that, e.g., the other end
104 // of the message point closed first.)
108 if (message_view.transport_data_buffer_size() > 0) {
109 DCHECK(message_view.transport_data_buffer());
110 message->SetDispatchers(TransportData::DeserializeDispatchers(
111 message_view.transport_data_buffer(),
112 message_view.transport_data_buffer_size(),
113 platform_handles.Pass(),
117 // Take a ref, and call |EnqueueMessage()| outside the lock.
118 message_pipe = message_pipe_;
122 MojoResult result = message_pipe->EnqueueMessage(
123 MessagePipe::GetPeerPort(port), message.Pass());
124 return (result == MOJO_RESULT_OK);
127 void ChannelEndpoint::OnDisconnect() {
128 scoped_refptr<MessagePipe> message_pipe;
131 base::AutoLock locker(lock_);
132 if (!message_pipe_.get())
135 // Take a ref, and call |Close()| outside the lock.
136 message_pipe = message_pipe_;
139 message_pipe->Close(port);
142 void ChannelEndpoint::DetachFromChannel() {
143 base::AutoLock locker(lock_);
144 // This may already be null if we already detached from the channel in
145 // |DetachFromMessagePipe()| by calling |Channel::DetachEndpoint()| (and there
146 // are racing detaches).
150 DCHECK(local_id_.is_valid());
151 // TODO(vtl): Once we combine "run" into "attach", |remote_id_| should valid
154 local_id_ = ChannelEndpointId();
155 remote_id_ = ChannelEndpointId();
158 ChannelEndpoint::~ChannelEndpoint() {
159 DCHECK(!message_pipe_.get());
161 DCHECK(!local_id_.is_valid());
162 DCHECK(!remote_id_.is_valid());
165 bool ChannelEndpoint::WriteMessageNoLock(scoped_ptr<MessageInTransit> message) {
168 lock_.AssertAcquired();
171 DCHECK(local_id_.is_valid());
172 DCHECK(remote_id_.is_valid());
174 message->SerializeAndCloseDispatchers(channel_);
175 message->set_source_id(local_id_);
176 message->set_destination_id(remote_id_);
177 return channel_->WriteMessage(message.Pass());
180 } // namespace system