1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/local_message_pipe_endpoint.h"
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
17 : is_open_(true), is_peer_open_(true) {
20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
22 DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open.
25 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
29 bool LocalMessagePipeEndpoint::OnPeerClose() {
31 DCHECK(is_peer_open_);
33 HandleSignalsState old_state = GetHandleSignalsState();
34 is_peer_open_ = false;
35 HandleSignalsState new_state = GetHandleSignalsState();
37 if (!new_state.equals(old_state))
38 waiter_list_.AwakeWaitersForStateChange(new_state);
43 void LocalMessagePipeEndpoint::EnqueueMessage(
44 scoped_ptr<MessageInTransit> message) {
46 DCHECK(is_peer_open_);
48 bool was_empty = message_queue_.IsEmpty();
49 message_queue_.AddMessage(message.Pass());
51 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
54 void LocalMessagePipeEndpoint::Close() {
57 message_queue_.Clear();
60 void LocalMessagePipeEndpoint::CancelAllWaiters() {
62 waiter_list_.CancelAllWaiters();
65 MojoResult LocalMessagePipeEndpoint::ReadMessage(
66 UserPointer<void> bytes,
67 UserPointer<uint32_t> num_bytes,
68 DispatcherVector* dispatchers,
69 uint32_t* num_dispatchers,
70 MojoReadMessageFlags flags) {
72 DCHECK(!dispatchers || dispatchers->empty());
74 const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
75 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
77 if (message_queue_.IsEmpty()) {
78 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT
79 : MOJO_RESULT_FAILED_PRECONDITION;
82 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
83 // and release the lock immediately.
84 bool enough_space = true;
85 MessageInTransit* message = message_queue_.PeekMessage();
86 if (!num_bytes.IsNull())
87 num_bytes.Put(message->num_bytes());
88 if (message->num_bytes() <= max_bytes)
89 bytes.PutArray(message->bytes(), message->num_bytes());
93 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
95 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
97 if (queued_dispatchers->empty()) {
99 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
101 dispatchers->swap(*queued_dispatchers);
103 enough_space = false;
108 *num_dispatchers = 0;
113 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
114 message_queue_.DiscardMessage();
116 // Now it's empty, thus no longer readable.
117 if (message_queue_.IsEmpty()) {
118 // It's currently not possible to wait for non-readability, but we should
119 // do the state change anyway.
120 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
125 return MOJO_RESULT_RESOURCE_EXHAUSTED;
127 return MOJO_RESULT_OK;
130 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() const {
131 HandleSignalsState rv;
132 if (!message_queue_.IsEmpty()) {
133 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
134 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
137 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
138 rv.satisfiable_signals |=
139 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
144 MojoResult LocalMessagePipeEndpoint::AddWaiter(
146 MojoHandleSignals signals,
148 HandleSignalsState* signals_state) {
151 HandleSignalsState state = GetHandleSignalsState();
152 if (state.satisfies(signals)) {
154 *signals_state = state;
155 return MOJO_RESULT_ALREADY_EXISTS;
157 if (!state.can_satisfy(signals)) {
159 *signals_state = state;
160 return MOJO_RESULT_FAILED_PRECONDITION;
163 waiter_list_.AddWaiter(waiter, signals, context);
164 return MOJO_RESULT_OK;
167 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter,
168 HandleSignalsState* signals_state) {
170 waiter_list_.RemoveWaiter(waiter);
172 *signals_state = GetHandleSignalsState();
175 } // namespace system