#include "base/logging.h"
#include "mojo/system/channel.h"
+#include "mojo/system/channel_endpoint.h"
#include "mojo/system/constants.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/memory.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe.h"
+#include "mojo/system/options_validation.h"
#include "mojo/system/proxy_message_pipe_endpoint.h"
namespace mojo {
// MessagePipeDispatcher -------------------------------------------------------
-MessagePipeDispatcher::MessagePipeDispatcher()
+// static
+const MojoCreateMessagePipeOptions
+ MessagePipeDispatcher::kDefaultCreateOptions = {
+ static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions)),
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE};
+
+MessagePipeDispatcher::MessagePipeDispatcher(
+ const MojoCreateMessagePipeOptions& /*validated_options*/)
: port_(kInvalidPort) {
}
+// static
+MojoResult MessagePipeDispatcher::ValidateCreateOptions(
+ UserPointer<const MojoCreateMessagePipeOptions> in_options,
+ MojoCreateMessagePipeOptions* out_options) {
+ const MojoCreateMessagePipeOptionsFlags kKnownFlags =
+ MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE;
+
+ *out_options = kDefaultCreateOptions;
+ if (in_options.IsNull())
+ return MOJO_RESULT_OK;
+
+ UserOptionsReader<MojoCreateMessagePipeOptions> reader(in_options);
+ if (!reader.is_valid())
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions, flags, reader))
+ return MOJO_RESULT_OK;
+ if ((reader.options().flags & ~kKnownFlags))
+ return MOJO_RESULT_UNIMPLEMENTED;
+ out_options->flags = reader.options().flags;
+
+ // Checks for fields beyond |flags|:
+
+ // (Nothing here yet.)
+
+ return MOJO_RESULT_OK;
+}
+
void MessagePipeDispatcher::Init(scoped_refptr<MessagePipe> message_pipe,
unsigned port) {
DCHECK(message_pipe.get());
}
// static
-std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
-MessagePipeDispatcher::CreateRemoteMessagePipe() {
+scoped_refptr<MessagePipeDispatcher>
+MessagePipeDispatcher::CreateRemoteMessagePipe(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
scoped_refptr<MessagePipe> message_pipe(
- new MessagePipe(
- scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
- scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
- scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
+ MessagePipe::CreateLocalProxy(channel_endpoint));
+ scoped_refptr<MessagePipeDispatcher> dispatcher(
+ new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
dispatcher->Init(message_pipe, 0);
-
- return std::make_pair(dispatcher, message_pipe);
+ return dispatcher;
}
// static
return scoped_refptr<MessagePipeDispatcher>();
}
- std::pair<scoped_refptr<MessagePipeDispatcher>, scoped_refptr<MessagePipe> >
- remote_message_pipe = CreateRemoteMessagePipe();
+ scoped_refptr<ChannelEndpoint> channel_endpoint;
+ scoped_refptr<MessagePipeDispatcher> dispatcher =
+ CreateRemoteMessagePipe(&channel_endpoint);
MessageInTransit::EndpointId remote_id =
static_cast<const SerializedMessagePipeDispatcher*>(source)->endpoint_id;
return scoped_refptr<MessagePipeDispatcher>();
}
MessageInTransit::EndpointId local_id =
- channel->AttachMessagePipeEndpoint(remote_message_pipe.second, 1);
+ channel->AttachEndpoint(channel_endpoint);
if (local_id == MessageInTransit::kInvalidEndpointId) {
LOG(ERROR) << "Failed to deserialize message pipe dispatcher (failed to "
"attach; remote ID = " << remote_id << ")";
return scoped_refptr<MessagePipeDispatcher>();
}
- DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = "
- << remote_id << ", new local ID = " << local_id << ")";
+ DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
+ << ", new local ID = " << local_id << ")";
if (!channel->RunMessagePipeEndpoint(local_id, remote_id)) {
// In general, this shouldn't fail, since we generated |local_id| locally.
// TODO(vtl): FIXME -- Need some error handling here.
channel->RunRemoteMessagePipeEndpoint(local_id, remote_id);
- return remote_message_pipe.first;
+ return dispatcher;
}
MessagePipeDispatcher::~MessagePipeDispatcher() {
void MessagePipeDispatcher::CloseImplNoLock() {
lock().AssertAcquired();
message_pipe_->Close(port_);
- message_pipe_ = NULL;
+ message_pipe_ = nullptr;
port_ = kInvalidPort;
}
MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
lock().AssertAcquired();
- scoped_refptr<MessagePipeDispatcher> rv = new MessagePipeDispatcher();
+ // TODO(vtl): Currently, there are no options, so we just use
+ // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
+ // too.
+ scoped_refptr<MessagePipeDispatcher> rv =
+ new MessagePipeDispatcher(kDefaultCreateOptions);
rv->Init(message_pipe_, port_);
- message_pipe_ = NULL;
+ message_pipe_ = nullptr;
port_ = kInvalidPort;
return scoped_refptr<Dispatcher>(rv.get());
}
MojoResult MessagePipeDispatcher::WriteMessageImplNoLock(
- const void* bytes,
+ UserPointer<const void> bytes,
uint32_t num_bytes,
std::vector<DispatcherTransport>* transports,
MojoWriteMessageFlags flags) {
lock().AssertAcquired();
- if (!VerifyUserPointer<void>(bytes, num_bytes))
- return MOJO_RESULT_INVALID_ARGUMENT;
if (num_bytes > kMaxMessageNumBytes)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
- return message_pipe_->WriteMessage(port_, bytes, num_bytes, transports,
- flags);
+ return message_pipe_->WriteMessage(
+ port_, bytes, num_bytes, transports, flags);
}
MojoResult MessagePipeDispatcher::ReadMessageImplNoLock(
- void* bytes,
- uint32_t* num_bytes,
+ UserPointer<void> bytes,
+ UserPointer<uint32_t> num_bytes,
DispatcherVector* dispatchers,
uint32_t* num_dispatchers,
MojoReadMessageFlags flags) {
lock().AssertAcquired();
+ return message_pipe_->ReadMessage(
+ port_, bytes, num_bytes, dispatchers, num_dispatchers, flags);
+}
- if (num_bytes) {
- if (!VerifyUserPointer<uint32_t>(num_bytes, 1))
- return MOJO_RESULT_INVALID_ARGUMENT;
- if (!VerifyUserPointer<void>(bytes, *num_bytes))
- return MOJO_RESULT_INVALID_ARGUMENT;
- }
-
- return message_pipe_->ReadMessage(port_, bytes, num_bytes, dispatchers,
- num_dispatchers, flags);
+HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
+ const {
+ lock().AssertAcquired();
+ return message_pipe_->GetHandleSignalsState(port_);
}
-MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(Waiter* waiter,
- MojoWaitFlags flags,
- MojoResult wake_result) {
+MojoResult MessagePipeDispatcher::AddWaiterImplNoLock(
+ Waiter* waiter,
+ MojoHandleSignals signals,
+ uint32_t context,
+ HandleSignalsState* signals_state) {
lock().AssertAcquired();
- return message_pipe_->AddWaiter(port_, waiter, flags, wake_result);
+ return message_pipe_->AddWaiter(
+ port_, waiter, signals, context, signals_state);
}
-void MessagePipeDispatcher::RemoveWaiterImplNoLock(Waiter* waiter) {
+void MessagePipeDispatcher::RemoveWaiterImplNoLock(
+ Waiter* waiter,
+ HandleSignalsState* signals_state) {
lock().AssertAcquired();
- message_pipe_->RemoveWaiter(port_, waiter);
+ message_pipe_->RemoveWaiter(port_, waiter, signals_state);
}
void MessagePipeDispatcher::StartSerializeImplNoLock(
Channel* channel,
void* destination,
size_t* actual_size,
- std::vector<embedder::PlatformHandle>* platform_handles) {
+ embedder::PlatformHandleVector* /*platform_handles*/) {
DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
- // Convert the local endpoint to a proxy endpoint (moving the message queue).
- message_pipe_->ConvertLocalToProxy(port_);
-
- // Attach the new proxy endpoint to the channel.
+ // Convert the local endpoint to a proxy endpoint (moving the message queue)
+ // and attach it to the channel.
MessageInTransit::EndpointId endpoint_id =
- channel->AttachMessagePipeEndpoint(message_pipe_, port_);
+ channel->AttachEndpoint(message_pipe_->ConvertLocalToProxy(port_));
// Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
// possible that the other endpoint -- the one that we're not sending -- was
// closed in the intervening time.) In that case, we need to deserialize a
static_cast<SerializedMessagePipeDispatcher*>(destination)->endpoint_id =
endpoint_id;
- message_pipe_ = NULL;
+ message_pipe_ = nullptr;
port_ = kInvalidPort;
*actual_size = sizeof(SerializedMessagePipeDispatcher);
// MessagePipeDispatcherTransport ----------------------------------------------
MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
- DispatcherTransport transport) : DispatcherTransport(transport) {
+ DispatcherTransport transport)
+ : DispatcherTransport(transport) {
DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe);
}