Use delegate port to improve concurrency 16/182016/5
authorJunghoon Park <jh9216.park@samsung.com>
Wed, 20 Jun 2018 01:03:34 +0000 (10:03 +0900)
committerJunghoon Park <jh9216.park@samsung.com>
Thu, 21 Jun 2018 00:14:03 +0000 (09:14 +0900)
Change-Id: Id3b2bb7e452e6ebc32bfefe99a0a65a0c4f992e0
Signed-off-by: Junghoon Park <jh9216.park@samsung.com>
src/port-internal.cc
src/port-internal.h
src/proxy-internal.cc
src/proxy-internal.h
src/stub-internal.cc
src/stub-internal.h
unit_tests/src/rpc_port_test.cc

index 23432aa..52e9128 100644 (file)
@@ -41,8 +41,8 @@
 namespace rpc_port {
 namespace internal {
 
-Port::Port(int fd, const std::string& id)
-    : fd_(fd), id_(id) {
+Port::Port(int fd, std::string id)
+    : fd_(fd), id_(std::move(id)) {
   char uuid[37];
   uuid_t u;
   uuid_generate(u);
@@ -50,6 +50,9 @@ Port::Port(int fd, const std::string& id)
   instance_ = std::string(uuid) + ":" + id;
 }
 
+Port::Port(int fd, std::string id, std::string instance)
+    : fd_(fd), id_(std::move(id)), instance_(std::move(instance)) {}
+
 Port::~Port() {
   close(fd_);
 }
index 9d52ae2..2c0bb71 100644 (file)
@@ -27,7 +27,8 @@ namespace internal {
 
 class Port {
  public:
-  Port(int fd, const std::string& id);
+  Port(int fd, std::string id, std::string instance);
+  Port(int fd, std::string id);
   virtual ~Port();
 
   int Read(void* buf, unsigned int size);
index bdda25c..2b6c6c6 100644 (file)
@@ -67,12 +67,12 @@ gboolean Proxy::OnDataReceived(GIOChannel *gio, GIOCondition cond,
   int fd = g_io_channel_unix_get_fd(gio);
   char buffer[4];
 
-  if (proxy->main_port_.get()->GetFd() == fd) {
+  if (proxy->delegate_port_.get()->GetFd() == fd) {
     if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
       LOGW("Socket was disconnected by stub. fd(%d)", fd);
       IEventListener* listener = proxy->listener_;
       proxy->listener_ = nullptr;
-      proxy->main_port_.get()->SetSource(0);
+      proxy->delegate_port_.get()->SetSource(0);
       if (listener)
         listener->OnDisconnected(proxy->target_appid_);
       return FALSE;
@@ -110,7 +110,7 @@ void Proxy::OnPortAppeared(const std::string& appid,
   }
 
   LOGW("[__OnPortAppeared__] fds[0]: %d, fds[1]: %d", fds[0], fds[1]);
-  main_port_.reset(new ProxyPort(this, fds[0], appid));
+  main_port_.reset(new ProxyPort(this, fds[0], appid, false));
   delegate_port_.reset(new ProxyPort(this, fds[1], appid));
   listener_->OnConnected(appid, main_port_.get());
 }
@@ -145,9 +145,9 @@ int Proxy::Connect(const std::string appid, const std::string& port_name,
   return RPC_PORT_ERROR_NONE;
 }
 
-Proxy::ProxyPort::ProxyPort(Proxy* parent, int fd, const std::string& id)
-    : Port(fd, id), parent_(parent) {
-  Watch();
+Proxy::ProxyPort::ProxyPort(Proxy* parent, int fd, const std::string& id,
+    bool receive) : Port(fd, id), parent_(parent) {
+  Watch(receive);
 }
 
 Proxy::ProxyPort::~ProxyPort() {
@@ -161,7 +161,7 @@ Proxy::ProxyPort::~ProxyPort() {
     g_io_channel_unref(gioc_);
 }
 
-int Proxy::ProxyPort::Watch() {
+int Proxy::ProxyPort::Watch(bool receive) {
   char buf[1024];
   int fd = GetFd();
 
@@ -181,6 +181,9 @@ int Proxy::ProxyPort::Watch() {
     return -1;
   }
 
+  if (!receive)
+    return 0;
+
   src_ = g_io_add_watch(gioc_,
       (GIOCondition)(G_IO_IN),
       Proxy::OnDataReceived, parent_);
index b053d84..f5768ed 100644 (file)
@@ -59,13 +59,13 @@ class Proxy : public FdBroker::IEventWatcher {
  private:
   class ProxyPort : public Port {
    public:
-    ProxyPort(Proxy* parent, int fd, const std::string& id);
+    ProxyPort(Proxy* parent, int fd, const std::string& id, bool receive = true);
     virtual ~ProxyPort();
     void SetDisconnectedSource(int sourceId);
     void SetSource(int sourceId);
 
    private:
-    int Watch();
+    int Watch(bool receive);
 
    private:
     GIOChannel* gioc_ = nullptr;
index b866b45..696eb6b 100644 (file)
@@ -81,7 +81,6 @@ std::shared_ptr<Port> Stub::FindDelegatePort(
   return {};
 }
 
-
 gboolean Stub::OnDataReceived(GIOChannel *gio, GIOCondition cond,
                               gpointer data) {
   Stub* stub = static_cast<Stub*>(data);
@@ -145,8 +144,10 @@ gboolean Stub::OnSocketDisconnected(GIOChannel *gio, GIOCondition cond,
 
 void Stub::OnFdReceived(const std::string& sender, int fds[2]) {
   LOGW("[__OnFdReceived__] fds[0]: %d, fds[1]: %d", fds[0], fds[1]);
-  ports_.emplace_back(new AcceptedPort(this, false, fds[0], sender));
-  ports_.emplace_back(new AcceptedPort(this, true, fds[1], sender));
+  auto* main_port = new AcceptedPort(this, false, fds[0], sender, true);
+  ports_.emplace_back(main_port);
+  ports_.emplace_back(new AcceptedPort(this, true, fds[1], sender,
+      main_port->GetInstance(), false));
   for (auto& p : ports_) {
     if (p->GetFd() == fds[0]) {
       listener_->OnConnected(p->GetId(), p->GetInstance());
@@ -155,10 +156,18 @@ void Stub::OnFdReceived(const std::string& sender, int fds[2]) {
   }
 }
 
-Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate,
-                                 int fd, const std::string& id)
-    : Port(fd, id), parent_(parent), is_delegate_(isDelegate) {
-  Watch();
+Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate, int fd,
+    std::string id, std::string inst, bool watch)
+    : Port(fd, std::move(id), std::move(inst)), parent_(parent),
+      is_delegate_(isDelegate) {
+  Watch(watch);
+}
+
+Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate, int fd,
+    std::string id, bool watch)
+    : Port(fd, std::move(id)), parent_(parent),
+      is_delegate_(isDelegate) {
+  Watch(watch);
 }
 
 Stub::AcceptedPort::~AcceptedPort() {
@@ -172,7 +181,7 @@ Stub::AcceptedPort::~AcceptedPort() {
     g_io_channel_unref(gioc_);
 }
 
-int Stub::AcceptedPort::Watch() {
+int Stub::AcceptedPort::Watch(bool receive) {
   char buf[1024];
   int fd = GetFd();
 
@@ -192,6 +201,9 @@ int Stub::AcceptedPort::Watch() {
     return -1;
   }
 
+  if (!receive)
+    return 0;
+
   src_ = g_io_add_watch(gioc_,
                         (GIOCondition)(G_IO_IN),
                         Stub::OnDataReceived, parent_);
index 05ff1b2..e3408f2 100644 (file)
@@ -55,14 +55,17 @@ class Stub : private FdBroker::IEventListener {
  private:
   class AcceptedPort : public Port {
    public:
-    AcceptedPort(Stub* parent, bool isDelegate, int fd, const std::string& id);
+    AcceptedPort(Stub* parent, bool isDelegate, int fd, std::string id,
+        std::string inst, bool receive);
+    AcceptedPort(Stub* parent, bool isDelegate, int fd, std::string id,
+        bool receive);
     virtual ~AcceptedPort();
     bool IsDelegate() const {
       return is_delegate_;
     }
 
    private:
-    int Watch();
+    int Watch(bool receive);
 
    private:
     GIOChannel* gioc_ = nullptr;
index 11d6eaf..dd2dc87 100644 (file)
@@ -105,6 +105,9 @@ class RpcPortConnection : public RpcPortBase {
            rpc_port_h port, void *data) -> int {
           RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
           p->stub_port_ = port;
+          rpc_port_stub_get_port(p->stub_handle_, RPC_PORT_PORT_CALLBACK,
+              instance, &p->stub_callback_port_);
+
           p->Finish();
           return 0;
         }, this);
@@ -131,6 +134,8 @@ class RpcPortConnection : public RpcPortBase {
         [](const char *ep, const char *port_name, rpc_port_h port, void *data) {
           RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
           p->proxy_port_ = port;
+          rpc_port_proxy_get_port(p->proxy_handle_, RPC_PORT_PORT_CALLBACK,
+              &p->proxy_callback_port_);
         }, this);
     ASSERT_EQ(ret, 0);
 
@@ -155,7 +160,9 @@ class RpcPortConnection : public RpcPortBase {
   }
 
   rpc_port_h proxy_port_ = nullptr;
+  rpc_port_h proxy_callback_port_ = nullptr;
   rpc_port_h stub_port_ = nullptr;
+  rpc_port_h stub_callback_port_ = nullptr;
   bool touch_proxy_disconnected_event_cb_ = false;
   bool touch_stub_disconnected_event_cb_ = false;
   bool touch_proxy_received_event_cb_ = false;
@@ -216,23 +223,26 @@ TEST_F(RpcPortConnection, rpc_port_proxy_event_receive) {
 
   RunMainLoop();
   ASSERT_NE(stub_port_, nullptr);
+  ASSERT_NE(stub_callback_port_, nullptr);
 
-  ret = rpc_port_write(stub_port_, res, sizeof(res));
+  ret = rpc_port_write(stub_callback_port_, res, sizeof(res));
   ASSERT_EQ(ret, 0);
+
   RunMainLoop();
   ASSERT_TRUE(touch_proxy_received_event_cb_);
 
-  ret = rpc_port_read(proxy_port_, r_buf, sizeof(res));
+  ret = rpc_port_read(proxy_callback_port_, r_buf, sizeof(res));
   ASSERT_EQ(ret, 0);
   ASSERT_STREQ(res, r_buf);
 
   touch_proxy_received_event_cb_ = false;
-  ret = rpc_port_write(stub_port_, res, sizeof(res));
+  ret = rpc_port_write(stub_callback_port_, res, sizeof(res));
   ASSERT_EQ(ret, 0);
+
   RunMainLoop();
   ASSERT_TRUE(touch_proxy_received_event_cb_);
 
-  ret = rpc_port_read(proxy_port_, r_buf, sizeof(res));
+  ret = rpc_port_read(proxy_callback_port_, r_buf, sizeof(res));
   ASSERT_EQ(ret, 0);
   ASSERT_STREQ(res, r_buf);
 }