#include "mojo/system/message_pipe.h"
#include "base/logging.h"
-#include "mojo/system/channel.h"
+#include "mojo/system/channel_endpoint.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe_dispatcher.h"
namespace mojo {
namespace system {
-MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
- scoped_ptr<MessagePipeEndpoint> endpoint1) {
- endpoints_[0].reset(endpoint0.release());
- endpoints_[1].reset(endpoint1.release());
+// static
+MessagePipe* MessagePipe::CreateLocalLocal() {
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+ return message_pipe;
}
-MessagePipe::MessagePipe() {
- endpoints_[0].reset(new LocalMessagePipeEndpoint());
- endpoints_[1].reset(new LocalMessagePipeEndpoint());
+// static
+MessagePipe* MessagePipe::CreateLocalProxy(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+ DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ MessagePipe* message_pipe = new MessagePipe();
+ message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+ *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
+ message_pipe->endpoints_[1].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+ return message_pipe;
+}
+
+// static
+MessagePipe* MessagePipe::CreateProxyLocal(
+ scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+ DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ MessagePipe* message_pipe = new MessagePipe();
+ *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
+ message_pipe->endpoints_[0].reset(
+ new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+ message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+ return message_pipe;
}
// static
endpoints_[port]->RemoveWaiter(waiter, signals_state);
}
-void MessagePipe::ConvertLocalToProxy(unsigned port) {
+scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port]);
DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
- bool is_peer_open = !!endpoints_[GetPeerPort(port)];
-
- // TODO(vtl): Hopefully this will work if the peer has been closed and when
- // the peer is local. If the peer is remote, we should do something more
- // sophisticated.
- DCHECK(!is_peer_open ||
- endpoints_[GetPeerPort(port)]->GetType() ==
- MessagePipeEndpoint::kTypeLocal);
-
- scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
- new ProxyMessagePipeEndpoint(
- static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
- is_peer_open));
- endpoints_[port].swap(replacement_endpoint);
+ // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
+ // |MessagePipe| with two proxy endpoints, which will then act as a proxy
+ // (rather than trying to connect the two ends directly).
+ DLOG_IF(WARNING,
+ !!endpoints_[GetPeerPort(port)] &&
+ endpoints_[GetPeerPort(port)]->GetType() !=
+ MessagePipeEndpoint::kTypeLocal)
+ << "Direct message pipe passing across multiple channels not yet "
+ "implemented; will proxy";
+
+ scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
+ scoped_refptr<ChannelEndpoint> channel_endpoint(
+ new ChannelEndpoint(this, port));
+ endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
+ channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
+ old_endpoint.get())->message_queue());
+ old_endpoint->Close();
+
+ return channel_endpoint;
}
MojoResult MessagePipe::EnqueueMessage(unsigned port,
scoped_ptr<MessageInTransit> message) {
- return EnqueueMessageInternal(port, message.Pass(), NULL);
-}
-
-bool MessagePipe::Attach(unsigned port,
- scoped_refptr<Channel> channel,
- MessageInTransit::EndpointId local_id) {
- DCHECK(port == 0 || port == 1);
- DCHECK(channel);
- DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
-
- base::AutoLock locker(lock_);
- if (!endpoints_[port])
- return false;
-
- DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
- endpoints_[port]->Attach(channel, local_id);
- return true;
-}
-
-void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
- DCHECK(port == 0 || port == 1);
- DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
- if (!endpoints_[port]->Run(remote_id))
- endpoints_[port].reset();
+ return EnqueueMessageInternal(port, message.Pass(), nullptr);
}
void MessagePipe::OnRemove(unsigned port) {
if (!endpoints_[port])
return;
- endpoints_[port]->OnRemove();
if (endpoints_[destination_port]) {
if (!endpoints_[destination_port]->OnPeerClose())
endpoints_[destination_port].reset();
endpoints_[port].reset();
}
+MessagePipe::MessagePipe() {
+}
+
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our