Implement rpc-port APIs 57/162557/11
authorJunghoon Park <jh9216.park@samsung.com>
Mon, 4 Dec 2017 07:59:41 +0000 (16:59 +0900)
committerJunghoon Park <jh9216.park@samsung.com>
Tue, 5 Dec 2017 03:08:44 +0000 (12:08 +0900)
Change-Id: I20443cce9b0396bd9fdcd46f99f24f1bf394a679
Signed-off-by: Junghoon Park <jh9216.park@samsung.com>
include/rpc-port-internal.h [new file with mode: 0644]
include/rpc-port.h
src/fdbroker-internal.cc
src/fdbroker-internal.h
src/proxy-internal.cc
src/proxy-internal.h
src/rpc-port.cc
src/stub-internal.cc
src/stub-internal.h
unit_tests/src/rpc_port_test.cc [new file with mode: 0644]

diff --git a/include/rpc-port-internal.h b/include/rpc-port-internal.h
new file mode 100644 (file)
index 0000000..1f7db12
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RPC_PORT_INTERNAL_INCLUDE_H__
+#define __RPC_PORT_INTERNAL_INCLUDE_H__
+
+#include <rpc-port.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int rpc_port_proxy_create_mockup(rpc_port_proxy_h *h);
+int rpc_port_stub_create_mockup(rpc_port_stub_h *h, const char *port_name);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __RPC_PORT_INTERNAL_INCLUDE_H__ */
+
+
+
index b66e677..efdae49 100755 (executable)
@@ -23,8 +23,6 @@ extern "C" {
 
 /* common */
 typedef void *rpc_port_h;
-int rpc_port_open(int fd, rpc_port_h *h);
-int rpc_port_close(rpc_port_h h);
 int rpc_port_read(rpc_port_h h, void *buf, unsigned int size);
 int rpc_port_write(rpc_port_h h, const void *buf, unsigned int size);
 
@@ -63,7 +61,7 @@ int rpc_port_stub_add_connected_event_cb(rpc_port_stub_h h,
                rpc_port_stub_connected_event_cb cb, void *data);
 int rpc_port_stub_add_disconnected_event_cb(rpc_port_stub_h h,
                rpc_port_stub_disconnected_event_cb cb, void *data);
-int rpc_port_stub_add_recevied_event_cb(rpc_port_stub_h h,
+int rpc_port_stub_add_received_event_cb(rpc_port_stub_h h,
                rpc_port_stub_received_event_cb cb, void *data);
 
 #ifdef __cplusplus
index 8fce446..b65bdb6 100644 (file)
@@ -18,6 +18,8 @@
 #define _GNU_SOURCE
 #endif
 
+#include <sys/types.h>
+#include <sys/socket.h>
 #include <aul.h>
 #include <dlog.h>
 
@@ -88,7 +90,36 @@ void FdBroker::DBusConnectionManager::Fini() {
   disposed_ = true;
 }
 
-FdBroker::SocketPair::SocketPair() {
+FdBroker::DBusMock& FdBroker::DBusMock::GetInst() {
+  static DBusMock mock;
+
+  return mock;
+}
+
+int FdBroker::DBusMock::Send(const std::string& sender,
+                              const std::string& port, int fd) {
+  if (ports_.find(port) == ports_.end())
+    return -1;
+
+  ports_[port]->OnFdReceived(sender, fd);
+  return 0;
+}
+
+int FdBroker::DBusMock::AddListener(const std::string& port,
+                                    FdBroker::IEventListener* listener) {
+  if (ports_.find(port) != ports_.end())
+    return -1;
+
+  ports_[port] = listener;
+  return 0;
+}
+
+void FdBroker::DBusMock::Dispose() {
+  ports_.clear();
+}
+
+FdBroker::SocketPair::SocketPair(bool mock)
+    : mock_(mock) {
   socks_[SENDER] = 0;
   socks_[RECEIVER] = 0;
 }
@@ -101,6 +132,10 @@ FdBroker::SocketPair::~SocketPair() {
 }
 
 int FdBroker::SocketPair::Request() {
+  if (mock_) {
+    return socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, socks_);
+  }
+
   if (aul_request_message_port_socket_pair(socks_) != AUL_R_OK) {
     LOGE("error create socket pair");
     return -1;
@@ -153,6 +188,10 @@ FdBroker::~FdBroker() {
         DBusConnectionManager::GetInst().GetConnection(),
         registration_id_);
   }
+
+  if (mock_) {
+    DBusMock::GetInst().Dispose();
+  }
 }
 
 std::string FdBroker::GetInterfaceName(const std::string& target_appid,
@@ -171,16 +210,28 @@ int FdBroker::Send(const std::string& target_appid,
   GDBusMessage *msg;
   GError *err = nullptr;
   GVariant *body;
-  SocketPair sock_pair;
+  SocketPair sock_pair(mock_);
   FdList fd_list;
   char sender_appid[255];
 
-  if (aul_app_get_appid_bypid(getpid(), sender_appid, sizeof(sender_appid)) < 0)
+  if (!mock_ && aul_app_get_appid_bypid(getpid(),
+       sender_appid, sizeof(sender_appid)) < 0) {
+    LOGE("Can't get appid");
     return -1;
+  }
 
   if (sock_pair.Request() != 0)
     return -1;
 
+  if (mock_) {
+    int ret = DBusMock::GetInst().Send("TestApp", port_name,
+                             sock_pair.Detach(SocketPair::RECEIVER));
+    if (ret < 0)
+      return ret;
+
+    return sock_pair.Detach(SocketPair::SENDER);
+  }
+
   if (fd_list.Add(sock_pair.Detach(SocketPair::RECEIVER)) != 0)
     return -1;
 
@@ -358,6 +409,15 @@ int FdBroker::Listen(IEventListener* ev, const std::string& port_name) {
     return -1;
   }
 
+  if (mock_) {
+    int ret = DBusMock::GetInst().AddListener(port_name, ev);
+
+    if (ret < 0)
+      return ret;
+
+    return 0;
+  }
+
   int ret = RegisterDbusInterface(port_name);
 
   if (ret != 0) {
index 9d2cb85..8ae7451 100644 (file)
@@ -23,6 +23,7 @@
 #include <glib-unix.h>
 
 #include <string>
+#include <map>
 #include <memory>
 
 namespace rpc_port {
@@ -35,7 +36,7 @@ class FdBroker {
     virtual void OnFdReceived(const std::string& sender, int fd) = 0;
   };
 
-  FdBroker() = default;
+  FdBroker(bool mock = false) : mock_(mock) {}
   ~FdBroker();
 
   static void Dispose() {
@@ -67,6 +68,25 @@ class FdBroker {
     GDBusConnection* gdbus_conn_ = nullptr;
   };
 
+  class DBusMock {
+   public:
+    DBusMock(const DBusMock&) = delete;
+    DBusMock& operator = (const DBusMock&) = delete;
+
+    static DBusMock& GetInst();
+    int Send(const std::string& sender, const std::string& port, int fd);
+    int AddListener(const std::string& port,
+                    FdBroker::IEventListener* listener);
+    void Dispose();
+
+   private:
+    DBusMock() = default;
+    ~DBusMock() = default;
+
+   private:
+    std::map<std::string, FdBroker::IEventListener*> ports_;
+  };
+
   class SocketPair {
    public:
     enum Type {
@@ -74,15 +94,17 @@ class FdBroker {
       RECEIVER = 1
     };
 
-    SocketPair();
+    SocketPair(bool mock = false);
     ~SocketPair();
 
     int Request();
+    void RequestMock();
     int Get(Type t) const;
     int Detach(Type t);
 
    private:
     int socks_[2];
+    bool mock_;
   };
 
   class FdList {
@@ -114,6 +136,7 @@ class FdBroker {
  private:
   IEventListener* listener_ = nullptr;
   int registration_id_ = 0;
+  bool mock_;
 };
 
 }  // namespace internal
index 6cabd91..3929df4 100644 (file)
@@ -31,6 +31,9 @@
 namespace rpc_port {
 namespace internal {
 
+Proxy::Proxy(bool mock)
+    : fd_broker_(mock) {}
+
 Proxy::~Proxy() {
   if (src_id_ > 0)
     g_source_remove(src_id_);
@@ -81,6 +84,7 @@ void Proxy::Connect(const std::string appid, const std::string& port_name,
 
   listener_ = ev;
   target_appid_ = appid;
+  port_name_ = port_name;
   int fd = fd_broker_.Send(appid, port_name);
 
   if (fd <= 0) {
index caa683e..2cb2c05 100644 (file)
@@ -32,7 +32,8 @@ namespace internal {
 
 class Proxy {
  public:
-  ~Proxy();
+  Proxy(bool mock = false);
+  virtual ~Proxy();
 
   class IEventListener {
    public:
@@ -47,12 +48,17 @@ class Proxy {
     return port_;
   }
 
+  const std::string& GetPortName() {
+    return port_name_;
+  }
+
  private:
   static gboolean OnSocketDisconnected(GIOChannel *gio, GIOCondition cond,
                                        gpointer data);
   int Watch(int fd);
 
  private:
+  std::string port_name_;
   std::shared_ptr<Port> port_;
   IEventListener* listener_ = nullptr;
   FdBroker fd_broker_;
index 4292359..a0372fa 100755 (executable)
 #include <glib.h>
 
 #include "rpc-port.h"
+#include "port-internal.h"
+#include "proxy-internal.h"
+#include "stub-internal.h"
 
 #undef RPC_API
 #define RPC_API extern "C" __attribute__((visibility("default")))
 
-RPC_API int rpc_port_open(int fd, rpc_port_h* h) {
-  return 0;
-}
+namespace {
+using namespace rpc_port::internal;
 
-RPC_API int rpc_port_close(rpc_port_h h) {
-  return 0;
-}
+template<typename T>
+class Event {
+ public:
+  Event(T cb, void* user_data)
+      : cb_(cb), user_data_(user_data) {}
+
+  T cb_;
+  void* user_data_;
+};
+
+class ProxyExt : public Proxy, public Proxy::IEventListener {
+ public:
+  ProxyExt(bool mock = false) : Proxy(mock) {}
+  virtual ~ProxyExt() = default;
+
+  void AddConnectedEventListener(rpc_port_proxy_connected_event_cb cb,
+                            void* user_data) {
+    connected_events_.emplace_back(
+        new Event<rpc_port_proxy_connected_event_cb>(cb, user_data));
+  }
+
+  void AddDisconnectedEventListener(rpc_port_proxy_disconnected_event_cb cb,
+                                    void* user_data) {
+    disconnected_events_.emplace_back(
+        new Event<rpc_port_proxy_disconnected_event_cb>(cb, user_data));
+  }
+
+  void AddRejectedEventListener(rpc_port_proxy_rejected_event_cb cb,
+                                void* user_data) {
+    rejected_events_.emplace_back(
+        new Event<rpc_port_proxy_rejected_event_cb>(cb, user_data));
+  }
+
+  void OnConnected(const std::string& endpoint, Port& port) override {
+    for (auto& ev : connected_events_) {
+      ev->cb_(endpoint.c_str(), GetPortName().c_str(), &port,
+              ev->user_data_);
+    }
+  }
+
+  void OnDisconnected(const std::string& endpoint) override {
+    for (auto& ev : disconnected_events_) {
+      ev->cb_(endpoint.c_str(), GetPortName().c_str(), ev->user_data_);
+    }
+  }
+
+  void OnRejected(const std::string& endpoint) override {
+    for (auto& ev : rejected_events_) {
+      ev->cb_(endpoint.c_str(), GetPortName().c_str(), ev->user_data_);
+    }
+  }
+
+ private:
+  std::list<std::unique_ptr<Event<rpc_port_proxy_connected_event_cb>>>
+      connected_events_;
+  std::list<std::unique_ptr<Event<rpc_port_proxy_disconnected_event_cb>>>
+      disconnected_events_;
+  std::list<std::unique_ptr<Event<rpc_port_proxy_rejected_event_cb>>>
+      rejected_events_;
+};
+
+class StubExt : public Stub, public Stub::IEventListener {
+ public:
+  StubExt(const std::string& port, bool mock = false) : Stub(port, mock) {}
+  virtual ~StubExt() = default;
+
+  void AddConnectedEventListener(rpc_port_stub_connected_event_cb cb,
+                            void* user_data) {
+    connected_events_.emplace_back(
+        new Event<rpc_port_stub_connected_event_cb>(cb, user_data));
+  }
+
+  void AddDisconnectedEventListener(rpc_port_stub_disconnected_event_cb cb,
+                                    void* user_data) {
+    disconnected_events_.emplace_back(
+        new Event<rpc_port_stub_disconnected_event_cb>(cb, user_data));
+  }
+
+  void AddReceivedEventListener(rpc_port_stub_received_event_cb cb,
+                                void* user_data) {
+    received_events_.emplace_back(
+        new Event<rpc_port_stub_received_event_cb>(cb, user_data));
+  }
+
+  void OnConnected(const std::string& sender) override {
+    for (auto& ev : connected_events_) {
+      ev->cb_(sender.c_str(), ev->user_data_);
+    }
+  }
+
+  void OnDisconnected(const std::string& sender) override {
+    for (auto& ev : disconnected_events_) {
+      ev->cb_(sender.c_str(), ev->user_data_);
+    }
+  }
+
+  void OnReceived(const std::string& sender, Port& port) override {
+    for (auto& ev : received_events_) {
+      ev->cb_(sender.c_str(), &port, ev->user_data_);
+    }
+  }
+
+ private:
+  std::list<std::unique_ptr<Event<rpc_port_stub_connected_event_cb>>>
+      connected_events_;
+  std::list<std::unique_ptr<Event<rpc_port_stub_disconnected_event_cb>>>
+      disconnected_events_;
+  std::list<std::unique_ptr<Event<rpc_port_stub_received_event_cb>>>
+      received_events_;
+};
+
+}  // namespace
 
 RPC_API int rpc_port_read(rpc_port_h h, void* buf, unsigned int size) {
-  return 0;
+  if (h == nullptr)
+    return -1;
+
+  auto port = static_cast<Port*>(h);
+
+  return port->Read(buf, size);
 }
 
 RPC_API int rpc_port_write(rpc_port_h h, const void* buf, unsigned int size) {
-  return 0;
+  if (h == nullptr)
+    return -1;
+
+  auto port = static_cast<Port*>(h);
+
+  return port->Write(buf, size);
 }
 
 RPC_API int rpc_port_proxy_create(rpc_port_proxy_h* h) {
+  auto p = new ::ProxyExt();
+
+  *h = p;
+  return 0;
+}
+
+RPC_API int rpc_port_proxy_create_mockup(rpc_port_proxy_h* h) {
+  auto p = new ::ProxyExt(true);
+
+  *h = p;
   return 0;
 }
 
 RPC_API int rpc_port_proxy_destroy(rpc_port_proxy_h h) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::ProxyExt*>(h);
+
+  delete p;
   return 0;
 }
 
 RPC_API int rpc_port_proxy_connect(rpc_port_proxy_h h, const char* appid,
     const char* port) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::ProxyExt*>(h);
+
+  p->Connect(appid, port, p);
   return 0;
 }
 
 RPC_API int rpc_port_proxy_add_connected_event_cb(rpc_port_proxy_h h,
     rpc_port_proxy_connected_event_cb cb, void *data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::ProxyExt*>(h);
+
+  p->AddConnectedEventListener(cb, data);
   return 0;
 }
 
 RPC_API int rpc_port_proxy_add_disconnected_event_cb(rpc_port_proxy_h h,
     rpc_port_proxy_disconnected_event_cb cb, void* data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::ProxyExt*>(h);
+
+  p->AddDisconnectedEventListener(cb, data);
   return 0;
 }
 
 RPC_API int rpc_port_proxy_add_rejected_event_cb(rpc_port_proxy_h h,
     rpc_port_proxy_rejected_event_cb cb, void* data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::ProxyExt*>(h);
+
+  p->AddRejectedEventListener(cb, data);
   return 0;
 }
 
 RPC_API int rpc_port_stub_create(rpc_port_stub_h* h, const char* port_name) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = new ::StubExt(port_name);
+
+  *h = p;
+  return 0;
+}
+
+RPC_API int rpc_port_stub_create_mockup(rpc_port_stub_h* h,
+                                        const char* port_name) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = new ::StubExt(port_name, true);
+
+  *h = p;
   return 0;
 }
 
 RPC_API int rpc_port_stub_destroy(rpc_port_stub_h h) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::StubExt*>(h);
+
+  delete p;
   return 0;
 }
 
 RPC_API int rpc_port_stub_listen(rpc_port_stub_h h) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::StubExt*>(h);
+
+  p->Listen(p);
   return 0;
 }
 
 RPC_API int rpc_port_stub_add_connected_event_cb(rpc_port_stub_h h,
     rpc_port_stub_connected_event_cb cb, void* data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::StubExt*>(h);
+
+  p->AddConnectedEventListener(cb, data);
   return 0;
 }
 
 RPC_API int rpc_port_stub_add_disconnected_event_cb(rpc_port_stub_h h,
     rpc_port_stub_disconnected_event_cb cb, void* data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::StubExt*>(h);
+
+  p->AddDisconnectedEventListener(cb, data);
   return 0;
 }
 
-RPC_API int rpc_port_stub_add_recevied_event_cb(rpc_port_stub_h h,
+RPC_API int rpc_port_stub_add_received_event_cb(rpc_port_stub_h h,
     rpc_port_stub_received_event_cb cb, void* data) {
+  if (h == nullptr)
+    return -1;
+
+  auto p = static_cast<::StubExt*>(h);
+
+  p->AddReceivedEventListener(cb, data);
   return 0;
 }
index 109ae8d..b4e7728 100644 (file)
@@ -31,8 +31,8 @@
 namespace rpc_port {
 namespace internal {
 
-Stub::Stub(const std::string& port_name)
-    : port_name_(port_name) {}
+Stub::Stub(const std::string& port_name, bool mock)
+    : fd_broker_(mock), port_name_(port_name) {}
 
 Stub::~Stub() {}
 
index 8c474ac..eec8293 100644 (file)
@@ -40,8 +40,8 @@ class Stub : private FdBroker::IEventListener {
     virtual void OnReceived(const std::string& sender, Port& port) = 0;
   };
 
-  Stub(const std::string& port_name);
-  ~Stub();
+  Stub(const std::string& port_name, bool mock = false);
+  virtual ~Stub();
 
   void Listen(IEventListener* ev);
 
diff --git a/unit_tests/src/rpc_port_test.cc b/unit_tests/src/rpc_port_test.cc
new file mode 100644 (file)
index 0000000..92b86dd
--- /dev/null
@@ -0,0 +1,232 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the License);
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+#include <iostream>
+#include <stdbool.h>
+#include <stdexcept>
+#include <glib.h>
+
+#include "rpc-port-internal.h"
+
+using namespace std;
+using ::testing::AtLeast;
+
+class RpcPortBase : public ::testing::Test {
+ public:
+  virtual void SetUp() {
+    mainloop_ = g_main_loop_new(nullptr, FALSE);
+
+    int ret = rpc_port_proxy_create_mockup(&proxy_handle_);
+
+    ASSERT_NE(proxy_handle_, nullptr);
+    ASSERT_EQ(ret, 0);
+
+    ret = rpc_port_stub_create_mockup(&stub_handle_, "test_port");
+    ASSERT_NE(stub_handle_, nullptr);
+    ASSERT_EQ(ret, 0);
+  }
+
+  virtual void TearDown() {
+    if (proxy_handle_) {
+      int ret = rpc_port_proxy_destroy(proxy_handle_);
+      ASSERT_EQ(ret, 0);
+    }
+
+    if (stub_handle_) {
+      int ret = rpc_port_stub_destroy(stub_handle_);
+      ASSERT_EQ(ret, 0);
+    }
+
+    g_main_loop_unref(mainloop_);
+    mainloop_ = nullptr;
+  }
+
+  void RunMainLoop() {
+    g_main_loop_run(mainloop_);
+  }
+
+  void Finish() {
+    g_main_loop_quit(mainloop_);
+  }
+
+  void KillStub() {
+    int ret = rpc_port_stub_destroy(stub_handle_);
+    ASSERT_EQ(ret, 0);
+    stub_handle_ = nullptr;
+  }
+
+  void KillProxy() {
+    int ret = rpc_port_proxy_destroy(proxy_handle_);
+    ASSERT_EQ(ret, 0);
+    proxy_handle_ = nullptr;
+  }
+
+  rpc_port_proxy_h proxy_handle_;
+  rpc_port_stub_h stub_handle_;
+  bool touch_proxy_connected_event_cb_ = false;
+  bool touch_stub_connected_event_cb_ = false;
+  bool touch_proxy_rejected_event_cb_ = false;
+
+ private:
+  static GMainLoop* mainloop_;
+
+};
+
+GMainLoop* RpcPortBase::mainloop_ = nullptr;
+
+class RpcPortConnection : public RpcPortBase {
+ public:
+  virtual void SetUp() {
+    RpcPortBase::SetUp();
+    StubSetup();
+    ProxySetup();
+  }
+
+  virtual void TearDown() {
+    RpcPortBase::TearDown();
+  }
+
+  void StubSetup() {
+    int ret = rpc_port_stub_add_received_event_cb(stub_handle_,
+        [](const char* sender, rpc_port_h port, void *data) {
+          RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
+          p->stub_port_ = port;
+          p->Finish();
+        }, this);
+    ASSERT_EQ(ret, 0);
+
+    ret = rpc_port_stub_add_disconnected_event_cb(stub_handle_,
+        [](const char* sender, void *data) {
+          RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
+          p->touch_stub_disconnected_event_cb_ = true;
+          p->Finish();
+        }, this);
+    ASSERT_EQ(ret, 0);
+
+    ret = rpc_port_stub_listen(stub_handle_);
+    ASSERT_EQ(ret, 0);
+  }
+
+  void ProxySetup() {
+    int ret = rpc_port_proxy_add_connected_event_cb(proxy_handle_,
+        [](const char *ep, const char *port_name, rpc_port_h port, void *data) {
+          RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
+          p->proxy_port_ = port;
+        }, this);
+    ASSERT_EQ(ret, 0);
+
+    ret = rpc_port_proxy_add_disconnected_event_cb(proxy_handle_,
+        [](const char *ep, const char *port_name, void *data) {
+          RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
+          p->touch_proxy_disconnected_event_cb_ = true;
+          p->Finish();
+        }, this);
+    ASSERT_EQ(ret, 0);
+
+    ret = rpc_port_proxy_connect(proxy_handle_, "TestApp", "test_port");
+    ASSERT_EQ(ret, 0);
+  }
+
+  rpc_port_h proxy_port_ = nullptr;
+  rpc_port_h stub_port_ = nullptr;
+  bool touch_proxy_disconnected_event_cb_ = false;
+  bool touch_stub_disconnected_event_cb_ = false;
+};
+
+TEST_F(RpcPortBase, rpc_port_event_connect) {
+  int ret = rpc_port_stub_add_connected_event_cb(stub_handle_,
+      [](const char *sender, void *data) {
+        RpcPortBase* p = static_cast<RpcPortBase*>(data);
+
+        p->touch_stub_connected_event_cb_ = true;
+      }, this);
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_stub_listen(stub_handle_);
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_proxy_add_connected_event_cb(proxy_handle_,
+    [](const char *ep, const char *port_name, rpc_port_h port, void *data) {
+      RpcPortBase* p = static_cast<RpcPortBase*>(data);
+
+      p->touch_proxy_connected_event_cb_ = true;
+    }, this);
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_proxy_connect(proxy_handle_, "TestApp", "test_port");
+  ASSERT_EQ(ret, 0);
+
+  ASSERT_TRUE(touch_proxy_connected_event_cb_);
+  ASSERT_TRUE(touch_stub_connected_event_cb_);
+}
+
+TEST_F(RpcPortBase, rpc_port_proxy_event_reject) {
+  int ret = rpc_port_stub_listen(stub_handle_);
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_proxy_add_rejected_event_cb(proxy_handle_,
+    [](const char *ep, const char *port_name, void *data) {
+      RpcPortBase* p = static_cast<RpcPortBase*>(data);
+
+      p->touch_proxy_rejected_event_cb_ = true;
+    }, this);
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_proxy_connect(proxy_handle_, "TestApp", "wrong_port");
+  ASSERT_EQ(ret, 0);
+
+  ASSERT_TRUE(touch_proxy_rejected_event_cb_);
+}
+
+TEST_F(RpcPortConnection, rpc_port_read_write) {
+  char buf[] = "test message";
+  char res[] = "OK";
+  char r_buf[256];
+
+  ASSERT_NE(proxy_port_, nullptr);
+  int ret = rpc_port_write(proxy_port_, buf, sizeof(buf));
+  ASSERT_EQ(ret, 0);
+
+  RunMainLoop();
+  ASSERT_NE(stub_port_, nullptr);
+
+  ret = rpc_port_read(stub_port_, r_buf, sizeof(buf));
+  ASSERT_EQ(ret, 0);
+  ASSERT_STREQ(buf, r_buf);
+
+  ret = rpc_port_write(stub_port_, res, sizeof(res));
+  ASSERT_EQ(ret, 0);
+
+  ret = rpc_port_read(proxy_port_, r_buf, sizeof(res));
+  ASSERT_EQ(ret, 0);
+  ASSERT_STREQ("OK", r_buf);
+}
+
+TEST_F(RpcPortConnection, rpc_port_proxy_disconnected) {
+  KillStub();
+  RunMainLoop();
+
+  ASSERT_TRUE(touch_proxy_disconnected_event_cb_);
+}
+
+TEST_F(RpcPortConnection, rpc_port_stub_disconnected) {
+  KillProxy();
+  RunMainLoop();
+
+  ASSERT_TRUE(touch_stub_disconnected_event_cb_);
+}