namespace mojo {
namespace system {
-LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
- : message_(NULL) {
-}
-
-// See comment in header file.
-LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
- const MessageQueueEntry& other)
- : message_(NULL) {
- DCHECK(!other.message_);
- DCHECK(other.dispatchers_.empty());
-}
-
-LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
- if (message_)
- message_->Destroy();
- // Close all the dispatchers.
- for (size_t i = 0; i < dispatchers_.size(); i++) {
- if (!dispatchers_[i].get())
- continue;
-
- // Note: Taking the |Dispatcher| locks is okay, since no one else should
- // have a reference to the dispatchers (and the locks shouldn't be held).
- DCHECK(dispatchers_[i]->HasOneRef());
- dispatchers_[i]->Close();
- }
-}
-
-void LocalMessagePipeEndpoint::MessageQueueEntry::Init(
- MessageInTransit* message,
- std::vector<DispatcherTransport>* transports) {
- DCHECK(message);
- DCHECK(!transports || !transports->empty());
- DCHECK(!message_);
- DCHECK(dispatchers_.empty());
-
- message_ = message;
- if (transports) {
- dispatchers_.reserve(transports->size());
- for (size_t i = 0; i < transports->size(); i++) {
- if ((*transports)[i].is_valid()) {
- dispatchers_.push_back(
- (*transports)[i].CreateEquivalentDispatcherAndClose());
-
-#ifndef NDEBUG
- // It's important that we have "ownership" of these dispatchers. In
- // particular, they must not be in the global handle table (i.e., have
- // live handles referring to them). If we need to destroy any queued
- // messages, we need to know that any handles in them should be closed.
- DCHECK(dispatchers_[i]->HasOneRef());
-#endif
- } else {
- LOG(WARNING) << "Enqueueing null dispatcher";
- dispatchers_.push_back(scoped_refptr<Dispatcher>());
- }
- }
- }
-}
-
LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
: is_open_(true),
is_peer_open_(true) {
LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
DCHECK(!is_open_);
+ DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open.
}
-void LocalMessagePipeEndpoint::Close() {
- DCHECK(is_open_);
- is_open_ = false;
- message_queue_.clear();
+MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
+ return kTypeLocal;
}
-void LocalMessagePipeEndpoint::OnPeerClose() {
+bool LocalMessagePipeEndpoint::OnPeerClose() {
DCHECK(is_open_);
DCHECK(is_peer_open_);
waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
new_satisfiable_flags);
}
+
+ return true;
}
-MojoResult LocalMessagePipeEndpoint::EnqueueMessage(
- MessageInTransit* message,
- std::vector<DispatcherTransport>* transports) {
+void LocalMessagePipeEndpoint::EnqueueMessage(
+ scoped_ptr<MessageInTransit> message) {
DCHECK(is_open_);
DCHECK(is_peer_open_);
- DCHECK(!transports || !transports->empty());
- bool was_empty = message_queue_.empty();
- // TODO(vtl): Use |emplace_back()| (and a suitable constructor, instead of
- // |Init()|) when that becomes available.
- message_queue_.push_back(MessageQueueEntry());
- message_queue_.back().Init(message, transports);
+ bool was_empty = message_queue_.IsEmpty();
+ message_queue_.AddMessage(message.Pass());
if (was_empty) {
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
}
+}
- return MOJO_RESULT_OK;
+void LocalMessagePipeEndpoint::Close() {
+ DCHECK(is_open_);
+ is_open_ = false;
+ message_queue_.Clear();
}
void LocalMessagePipeEndpoint::CancelAllWaiters() {
waiter_list_.CancelAllWaiters();
}
-MojoResult LocalMessagePipeEndpoint::ReadMessage(
- void* bytes, uint32_t* num_bytes,
- std::vector<scoped_refptr<Dispatcher> >* dispatchers,
- uint32_t* num_dispatchers,
- MojoReadMessageFlags flags) {
+MojoResult LocalMessagePipeEndpoint::ReadMessage(void* bytes,
+ uint32_t* num_bytes,
+ DispatcherVector* dispatchers,
+ uint32_t* num_dispatchers,
+ MojoReadMessageFlags flags) {
DCHECK(is_open_);
DCHECK(!dispatchers || dispatchers->empty());
const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
- if (message_queue_.empty()) {
+ if (message_queue_.IsEmpty()) {
return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT :
MOJO_RESULT_FAILED_PRECONDITION;
}
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
// and release the lock immediately.
bool enough_space = true;
- const MessageInTransit* queued_message = message_queue_.front().message();
+ MessageInTransit* message = message_queue_.PeekMessage();
if (num_bytes)
- *num_bytes = queued_message->num_bytes();
- if (queued_message->num_bytes() <= max_bytes)
- memcpy(bytes, queued_message->bytes(), queued_message->num_bytes());
+ *num_bytes = message->num_bytes();
+ if (message->num_bytes() <= max_bytes)
+ memcpy(bytes, message->bytes(), message->num_bytes());
else
enough_space = false;
- std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
- message_queue_.front().dispatchers();
- if (num_dispatchers)
- *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
- if (enough_space) {
- if (queued_dispatchers->empty()) {
- // Nothing to do.
- } else if (queued_dispatchers->size() <= max_num_dispatchers) {
- DCHECK(dispatchers);
- dispatchers->swap(*queued_dispatchers);
- } else {
- enough_space = false;
+ if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
+ if (num_dispatchers)
+ *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
+ if (enough_space) {
+ if (queued_dispatchers->empty()) {
+ // Nothing to do.
+ } else if (queued_dispatchers->size() <= max_num_dispatchers) {
+ DCHECK(dispatchers);
+ dispatchers->swap(*queued_dispatchers);
+ } else {
+ enough_space = false;
+ }
}
+ } else {
+ if (num_dispatchers)
+ *num_dispatchers = 0;
}
+ message = NULL;
+
if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
- message_queue_.pop_front();
+ message_queue_.DiscardMessage();
// Now it's empty, thus no longer readable.
- if (message_queue_.empty()) {
+ if (message_queue_.IsEmpty()) {
// It's currently not possible to wait for non-readability, but we should
// do the state change anyway.
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
MojoWaitFlags satisfied_flags = 0;
- if (!message_queue_.empty())
+ if (!message_queue_.IsEmpty())
satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
if (is_peer_open_)
satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
MojoWaitFlags satisfiable_flags = 0;
- if (!message_queue_.empty() || is_peer_open_)
+ if (!message_queue_.IsEmpty() || is_peer_open_)
satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
if (is_peer_open_)
satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;