Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / mojo / system / message_pipe.cc
index efb8c67..d12e303 100644 (file)
@@ -5,7 +5,7 @@
 #include "mojo/system/message_pipe.h"
 
 #include "base/logging.h"
-#include "mojo/system/channel.h"
+#include "mojo/system/channel_endpoint.h"
 #include "mojo/system/local_message_pipe_endpoint.h"
 #include "mojo/system/message_in_transit.h"
 #include "mojo/system/message_pipe_dispatcher.h"
 namespace mojo {
 namespace system {
 
-MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
-                         scoped_ptr<MessagePipeEndpoint> endpoint1) {
-  endpoints_[0].reset(endpoint0.release());
-  endpoints_[1].reset(endpoint1.release());
+// static
+MessagePipe* MessagePipe::CreateLocalLocal() {
+  MessagePipe* message_pipe = new MessagePipe();
+  message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+  message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+  return message_pipe;
 }
 
-MessagePipe::MessagePipe() {
-  endpoints_[0].reset(new LocalMessagePipeEndpoint());
-  endpoints_[1].reset(new LocalMessagePipeEndpoint());
+// static
+MessagePipe* MessagePipe::CreateLocalProxy(
+    scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
+  MessagePipe* message_pipe = new MessagePipe();
+  message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
+  *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
+  message_pipe->endpoints_[1].reset(
+      new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+  return message_pipe;
+}
+
+// static
+MessagePipe* MessagePipe::CreateProxyLocal(
+    scoped_refptr<ChannelEndpoint>* channel_endpoint) {
+  DCHECK(!channel_endpoint->get());  // Not technically wrong, but unlikely.
+  MessagePipe* message_pipe = new MessagePipe();
+  *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
+  message_pipe->endpoints_[0].reset(
+      new ProxyMessagePipeEndpoint(channel_endpoint->get()));
+  message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
+  return message_pipe;
 }
 
 // static
@@ -130,58 +151,37 @@ void MessagePipe::RemoveWaiter(unsigned port,
   endpoints_[port]->RemoveWaiter(waiter, signals_state);
 }
 
-void MessagePipe::ConvertLocalToProxy(unsigned port) {
+scoped_refptr<ChannelEndpoint> MessagePipe::ConvertLocalToProxy(unsigned port) {
   DCHECK(port == 0 || port == 1);
 
   base::AutoLock locker(lock_);
   DCHECK(endpoints_[port]);
   DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
 
-  bool is_peer_open = !!endpoints_[GetPeerPort(port)];
-
-  // TODO(vtl): Hopefully this will work if the peer has been closed and when
-  // the peer is local. If the peer is remote, we should do something more
-  // sophisticated.
-  DCHECK(!is_peer_open ||
-         endpoints_[GetPeerPort(port)]->GetType() ==
-             MessagePipeEndpoint::kTypeLocal);
-
-  scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
-      new ProxyMessagePipeEndpoint(
-          static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
-          is_peer_open));
-  endpoints_[port].swap(replacement_endpoint);
+  // TODO(vtl): Allowing this case is a temporary hack. It'll set up a
+  // |MessagePipe| with two proxy endpoints, which will then act as a proxy
+  // (rather than trying to connect the two ends directly).
+  DLOG_IF(WARNING,
+          !!endpoints_[GetPeerPort(port)] &&
+              endpoints_[GetPeerPort(port)]->GetType() !=
+                  MessagePipeEndpoint::kTypeLocal)
+      << "Direct message pipe passing across multiple channels not yet "
+         "implemented; will proxy";
+
+  scoped_ptr<MessagePipeEndpoint> old_endpoint(endpoints_[port].Pass());
+  scoped_refptr<ChannelEndpoint> channel_endpoint(
+      new ChannelEndpoint(this, port));
+  endpoints_[port].reset(new ProxyMessagePipeEndpoint(channel_endpoint.get()));
+  channel_endpoint->TakeMessages(static_cast<LocalMessagePipeEndpoint*>(
+                                     old_endpoint.get())->message_queue());
+  old_endpoint->Close();
+
+  return channel_endpoint;
 }
 
 MojoResult MessagePipe::EnqueueMessage(unsigned port,
                                        scoped_ptr<MessageInTransit> message) {
-  return EnqueueMessageInternal(port, message.Pass(), NULL);
-}
-
-bool MessagePipe::Attach(unsigned port,
-                         scoped_refptr<Channel> channel,
-                         MessageInTransit::EndpointId local_id) {
-  DCHECK(port == 0 || port == 1);
-  DCHECK(channel);
-  DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
-
-  base::AutoLock locker(lock_);
-  if (!endpoints_[port])
-    return false;
-
-  DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
-  endpoints_[port]->Attach(channel, local_id);
-  return true;
-}
-
-void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
-  DCHECK(port == 0 || port == 1);
-  DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
-
-  base::AutoLock locker(lock_);
-  DCHECK(endpoints_[port]);
-  if (!endpoints_[port]->Run(remote_id))
-    endpoints_[port].reset();
+  return EnqueueMessageInternal(port, message.Pass(), nullptr);
 }
 
 void MessagePipe::OnRemove(unsigned port) {
@@ -192,7 +192,6 @@ void MessagePipe::OnRemove(unsigned port) {
   if (!endpoints_[port])
     return;
 
-  endpoints_[port]->OnRemove();
   if (endpoints_[destination_port]) {
     if (!endpoints_[destination_port]->OnPeerClose())
       endpoints_[destination_port].reset();
@@ -200,6 +199,9 @@ void MessagePipe::OnRemove(unsigned port) {
   endpoints_[port].reset();
 }
 
+MessagePipe::MessagePipe() {
+}
+
 MessagePipe::~MessagePipe() {
   // Owned by the dispatchers. The owning dispatchers should only release us via
   // their |Close()| method, which should inform us of being closed via our