1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/local_message_pipe_endpoint.h"
9 #include "base/logging.h"
10 #include "mojo/system/message_in_transit.h"
15 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
24 void LocalMessagePipeEndpoint::OnPeerClose() {
26 DCHECK(is_peer_open_);
28 MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
29 MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
30 is_peer_open_ = false;
31 MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
32 MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
34 if (new_satisfied_flags != old_satisfied_flags ||
35 new_satisfiable_flags != old_satisfiable_flags) {
36 waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
37 new_satisfiable_flags);
41 MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
42 const void* bytes, uint32_t num_bytes,
43 const MojoHandle* handles, uint32_t num_handles,
44 MojoWriteMessageFlags /*flags*/) {
46 DCHECK(is_peer_open_);
48 bool was_empty = message_queue_.empty();
50 // TODO(vtl): Eventually (with C++11), this should be an |emplace_back()|.
51 message_queue_.push_back(MessageInTransit::Create(bytes, num_bytes));
52 // TODO(vtl): Support sending handles.
55 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
59 return MOJO_RESULT_OK;
62 void LocalMessagePipeEndpoint::CancelAllWaiters() {
64 waiter_list_.CancelAllWaiters();
67 void LocalMessagePipeEndpoint::Close() {
70 for (std::deque<MessageInTransit*>::iterator it = message_queue_.begin();
71 it != message_queue_.end();
75 message_queue_.clear();
78 MojoResult LocalMessagePipeEndpoint::ReadMessage(
79 void* bytes, uint32_t* num_bytes,
80 MojoHandle* handles, uint32_t* num_handles,
81 MojoReadMessageFlags flags) {
84 const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
85 // TODO(vtl): We'll need this later:
86 // const uint32_t max_handles = num_handles ? *num_handles : 0;
88 if (message_queue_.empty())
89 return MOJO_RESULT_NOT_FOUND;
91 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
92 // and release the lock immediately.
93 bool not_enough_space = false;
94 MessageInTransit* const message = message_queue_.front();
96 *num_bytes = message->data_size();
97 if (message->data_size() <= max_bytes)
98 memcpy(bytes, message->data(), message->data_size());
100 not_enough_space = true;
102 // TODO(vtl): Support receiving handles.
106 if (!not_enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
107 message_queue_.pop_front();
110 // Now it's empty, thus no longer readable.
111 if (message_queue_.empty()) {
112 // It's currently not possible to wait for non-readability, but we should
113 // do the state change anyway.
114 waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
119 if (not_enough_space)
120 return MOJO_RESULT_RESOURCE_EXHAUSTED;
122 return MOJO_RESULT_OK;
125 MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
127 MojoResult wake_result) {
130 if ((flags & SatisfiedFlags()))
131 return MOJO_RESULT_ALREADY_EXISTS;
132 if (!(flags & SatisfiableFlags()))
133 return MOJO_RESULT_FAILED_PRECONDITION;
135 waiter_list_.AddWaiter(waiter, flags, wake_result);
136 return MOJO_RESULT_OK;
139 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
141 waiter_list_.RemoveWaiter(waiter);
144 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
145 MojoWaitFlags satisfied_flags = 0;
146 if (!message_queue_.empty())
147 satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
149 satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
150 return satisfied_flags;
153 MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
154 MojoWaitFlags satisfiable_flags = 0;
155 if (!message_queue_.empty() || is_peer_open_)
156 satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
158 satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
159 return satisfiable_flags;
162 } // namespace system