Refactor DebugPort implementation 94/315494/2
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 1 Aug 2024 23:30:18 +0000 (08:30 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Fri, 2 Aug 2024 00:05:39 +0000 (09:05 +0900)
The parameters of AddSession() method ars changed.

Change-Id: Ib75c008d6fd9f63bb673c68b6be9488ca9b83821
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/rpc-port/debug-port-internal.cc
src/rpc-port/debug-port-internal.hh
src/rpc-port/proxy-internal.cc
src/rpc-port/stub-internal.cc

index 42144956a58e88cac280c0e5c5329cc5cdd1c9b2..d343b2c43c1601dbac0780332951018b474257ea 100644 (file)
@@ -49,29 +49,32 @@ constexpr const char KEY_PORT_NAME[] = "__K_PORT_NAME__";
 
 class Session {
  public:
-  Session(std::string port_name, std::string destination, int main_read_fd,
-          int main_write_fd, int delegate_read_fd, int delegate_write_fd)
+  Session(std::string port_name, std::shared_ptr<Port> main_port,
+          std::shared_ptr<Port> delegate_port)
       : port_name_(std::move(port_name)),
-        destination_(std::move(destination)),
-        main_read_fd_(main_read_fd),
-        main_write_fd_(main_write_fd),
-        delegate_read_fd_(delegate_read_fd),
-        delegate_write_fd_(delegate_write_fd) {}
+        main_port_(std::move(main_port)),
+        delegate_port_(std::move(delegate_port)) {}
 
   const std::string& GetPortName() const { return port_name_; }
-  const std::string& GetDestination() const { return destination_; }
-  int GetMainReadFd() const { return main_read_fd_; }
-  int GetMainWriteFd() const { return main_write_fd_; }
-  int GetDelegateReadFd() const { return delegate_read_fd_; }
-  int GetDelegateWriteFd() const { return delegate_write_fd_; }
+
+  const std::string& GetDestination() const { return main_port_->GetId(); }
+
+  const std::shared_ptr<Port>& GetMainPort() const { return main_port_; }
+
+  const std::shared_ptr<Port>& GetDelegatePort() const {
+    return delegate_port_;
+  }
+
+  bool PortExist(int fd) const {
+    return (main_port_->GetReadFd() == fd || main_port_->GetWriteFd() == fd ||
+            delegate_port_->GetReadFd() == fd ||
+            delegate_port_->GetWriteFd() == fd);
+  }
 
  private:
   std::string port_name_;
-  std::string destination_;
-  int main_read_fd_;
-  int main_write_fd_;
-  int delegate_read_fd_;
-  int delegate_write_fd_;
+  std::shared_ptr<Port> main_port_;
+  std::shared_ptr<Port> delegate_port_;
 };
 
 class DebugPortImpl {
@@ -81,12 +84,11 @@ class DebugPortImpl {
 
   void Dispose();
   bool IsConnected() const;
-  void AddSession(std::string port_name, std::string destination,
-                  int main_read_fd, int main_write_fd, int delegate_read_fd,
-                  int delegate_write_fd);
+  void AddSession(std::string port_name, std::shared_ptr<Port> main_port,
+                  std::shared_ptr<Port> delegate_port);
   void RemoveSession(int port);
-  int Send(int port, bool is_read, uint32_t seq,
-      const void* buf, unsigned int size);
+  void Send(int port, bool is_read, uint32_t seq, const void* buf,
+            unsigned int size);
   void Init();
 
  private:
@@ -101,20 +103,19 @@ class DebugPortImpl {
   std::shared_ptr<Session> FindSession(int port);
   std::shared_ptr<Session> FindSession(const std::string& port_name);
 
-  static gboolean OnDebugPortDisconnectedCb(GIOChannel* io,
-      GIOCondition cond, gpointer data);
+  static gboolean OnDebugPortDisconnectedCb(gint fd, GIOCondition cond,
+                                            gpointer data);
   static int AppComCb(const char* endpoint, aul_app_com_result_e result,
-      bundle* envelope, void* user_data);
+                      bundle* envelope, void* user_data);
 
  private:
   bool disposed_ = true;
-  std::atomic<bool> connected_ { false };
-  std::unique_ptr<Port> port_;
-  GIOChannel* io_ = nullptr;
-  guint watch_tag_ = 0;
+  std::atomic<bool> connected_{false};
+  std::shared_ptr<Port> port_;
+  guint source_ = 0;
   std::list<std::shared_ptr<Session>> sessions_;
   std::thread thread_;
-  std::atomic<bool> is_running_ { false };
+  std::atomic<bool> is_running_{false};
   tizen_base::SharedQueue<std::shared_ptr<tizen_base::Parcel>> queue_;
   mutable std::recursive_mutex mutex_;
   aul_app_com_connection_h conn_ = nullptr;
@@ -126,8 +127,7 @@ DebugPortImpl::~DebugPortImpl() {
 
 void DebugPortImpl::Dispose() {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  if (disposed_)
-    return;
+  if (disposed_) return;
 
   if (conn_) {
     aul_app_com_leave(conn_);
@@ -139,28 +139,23 @@ void DebugPortImpl::Dispose() {
   disposed_ = true;
 }
 
-bool DebugPortImpl::IsConnected() const {
-  return connected_;
-}
+bool DebugPortImpl::IsConnected() const { return connected_; }
 
-void DebugPortImpl::AddSession(std::string port_name, std::string destination,
-                               int main_read_fd, int main_write_fd,
-                               int delegate_read_fd, int delegate_write_fd) {
+void DebugPortImpl::AddSession(std::string port_name,
+                               std::shared_ptr<Port> main_port,
+                               std::shared_ptr<Port> delegate_port) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  sessions_.emplace_back(
-      new Session(std::move(port_name), std::move(destination), main_read_fd,
-                  main_write_fd, delegate_read_fd, delegate_write_fd));
+  sessions_.emplace_back(new Session(std::move(port_name), std::move(main_port),
+                                     std::move(delegate_port)));
 }
 
 void DebugPortImpl::RemoveSession(int port) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  auto iter = std::find_if(sessions_.begin(), sessions_.end(),
-                           [port](std::shared_ptr<Session>& sess) -> bool {
-                             return sess->GetMainReadFd() == port ||
-                                    sess->GetMainWriteFd() == port ||
-                                    sess->GetDelegateReadFd() == port ||
-                                    sess->GetDelegateWriteFd() == port;
-                           });
+  auto iter =
+      std::find_if(sessions_.begin(), sessions_.end(),
+                   [port](const std::shared_ptr<Session>& session) -> bool {
+                     return session->PortExist(port);
+                   });
   if (iter != sessions_.end()) {
     _W("Remove session. port(%d)", port);
     iter = sessions_.erase(iter);
@@ -169,10 +164,8 @@ void DebugPortImpl::RemoveSession(int port) {
 
 std::shared_ptr<Session> DebugPortImpl::FindSession(int fd) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  for (auto& s : sessions_) {
-    if (s->GetMainReadFd() == fd || s->GetMainWriteFd() == fd ||
-        s->GetDelegateReadFd() == fd || s->GetDelegateWriteFd() == fd)
-      return s;
+  for (auto& session : sessions_) {
+    if (session->PortExist(fd)) return session;
   }
 
   return nullptr;
@@ -181,32 +174,32 @@ std::shared_ptr<Session> DebugPortImpl::FindSession(int fd) {
 std::shared_ptr<Session> DebugPortImpl::FindSession(
     const std::string& port_name) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  for (auto& s : sessions_) {
-    if (s->GetPortName() == port_name)
-      return s;
+  for (auto& session : sessions_) {
+    if (session->GetPortName() == port_name) return session;
   }
 
   return nullptr;
 }
 
-int DebugPortImpl::Send(int port, bool is_read, uint32_t seq,
-    const void* buf, unsigned int size) {
-  if (!IsConnected())
-    return 0;
+void DebugPortImpl::Send(int port, bool is_read, uint32_t seq, const void* buf,
+                         unsigned int size) {
+  if (!IsConnected()) return;
 
   auto session = FindSession(port);
   if (session == nullptr) {
     _E("Failed to find session. port(%d)", port);
-    return -1;
+    return;
   }
 
-  // time + port_name + destination + is_delegate + port + is_read + seq + size + data
+  // time + port_name + destination + is_delegate + port +
+  // is_read + seq + size + data
   tizen_base::Parcel parcel;
   parcel.WriteInt64(time(nullptr));
   parcel.WriteString(session->GetPortName().c_str());
   parcel.WriteString(session->GetDestination().c_str());
-  parcel.WriteBool(session->GetDelegateReadFd() == port ||
-                   session->GetDelegateWriteFd() == port);
+  const auto& delegate_port = session->GetDelegatePort();
+  parcel.WriteBool(delegate_port->GetReadFd() == port ||
+                   delegate_port->GetWriteFd() == port);
   parcel.WriteInt32(port);
   parcel.WriteBool(is_read);
   parcel.WriteInt32(seq);
@@ -214,7 +207,6 @@ int DebugPortImpl::Send(int port, bool is_read, uint32_t seq,
   parcel.Write(static_cast<const unsigned char*>(buf), size);
 
   queue_.Push(std::make_shared<tizen_base::Parcel>(parcel));
-  return 0;
 }
 
 void DebugPortImpl::Init() {
@@ -223,18 +215,15 @@ void DebugPortImpl::Init() {
     return;
 
   aul_app_com_create_async(ENDPOINT_RPC_PORT_DEBUG, nullptr, AppComCb, this,
-      &conn_);
-  if (conn_ == nullptr)
-    return;
+                           &conn_);
+  if (conn_ == nullptr) return;
 
   do {
     int fd = Connect();
-    if (fd < 0)
-      break;
+    if (fd < 0) break;
 
     port_.reset(new Port(-1, fd, "Debug"));
-    if (Watch(fd) < 0)
-      break;
+    if (Watch(fd) < 0) break;
 
     SetConnectionStatus(true);
     _W("Connected");
@@ -245,8 +234,7 @@ void DebugPortImpl::Init() {
 }
 
 int DebugPortImpl::Connect() {
-  if (access(PATH_RPC_PORT_UTIL_SOCK, F_OK) != 0)
-    return -1;
+  if (access(PATH_RPC_PORT_UTIL_SOCK, F_OK) != 0) return -1;
 
   int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
   if (fd < 0) {
@@ -258,58 +246,44 @@ int DebugPortImpl::Connect() {
   addr.sun_family = AF_UNIX;
   snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", PATH_RPC_PORT_UTIL_SOCK);
 
-  int ret = connect(fd, reinterpret_cast<struct sockaddr*>(&addr),
-      sizeof(addr));
+  g_unix_set_fd_nonblocking(fd, TRUE, nullptr);
+  int ret = connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
   if (ret < 0) {
     _E("connect() is failed. fd(%d), errno(%d)", fd, errno);
     close(fd);
     return -1;
   }
 
+
   return fd;
 }
 
 int DebugPortImpl::Watch(int fd) {
-  GIOChannel* io = g_io_channel_unix_new(fd);
-  if (io == nullptr) {
-    _E("g_io_channel_unix_new() is failed");
-    return -1;
-  }
-
-  GIOCondition cond = static_cast<GIOCondition>(
-      (G_IO_ERR | G_IO_HUP | G_IO_NVAL));
-  guint tag = g_io_add_watch(io, cond, OnDebugPortDisconnectedCb, this);
-  if (tag == 0) {
-    _E("g_io_add_watch() is failed");
-    g_io_channel_unref(io);
+  source_ = g_unix_fd_add(
+      fd, static_cast<GIOCondition>(G_IO_ERR | G_IO_HUP | G_IO_NVAL),
+      OnDebugPortDisconnectedCb, this);
+  if (source_ == 0) {
+    _E("g_unix_fd_add() is failed");
     return -1;
   }
 
-  io_ = io;
-  watch_tag_ = tag;
   return 0;
 }
 
 void DebugPortImpl::Unwatch() {
-  if (io_) {
-    g_io_channel_unref(io_);
-    io_ = nullptr;
-  }
-
-  if (watch_tag_) {
-    g_source_remove(watch_tag_);
-    watch_tag_ = 0;
+  if (source_) {
+    g_source_remove(source_);
+    source_ = 0;
   }
 }
 
-gboolean DebugPortImpl::OnDebugPortDisconnectedCb(GIOChannel* io,
-    GIOCondition cond, gpointer data) {
+gboolean DebugPortImpl::OnDebugPortDisconnectedCb(gint fd, GIOCondition cond,
+                                                  gpointer data) {
   _W("cond(%d)", static_cast<int>(cond));
   auto* debug_port = static_cast<DebugPortImpl*>(data);
   std::lock_guard<std::recursive_mutex> lock(debug_port->GetMutex());
   debug_port->SetConnectionStatus(false);
-  debug_port->watch_tag_ = 0;
-  debug_port->Unwatch();
+  debug_port->source_ = 0;
   debug_port->port_.reset();
   _W("Disconnected");
   return G_SOURCE_REMOVE;
@@ -320,37 +294,35 @@ void DebugPortImpl::SetConnectionStatus(bool status) {
 }
 
 void DebugPortImpl::CreateThread() {
-  if (is_running_)
-    return;
+  if (is_running_) return;
 
   thread_ = std::thread([&]() {
-      _W("START");
-      do {
-        std::shared_ptr<tizen_base::Parcel> parcel = queue_.WaitAndPop();
-        int len = parcel->GetDataSize();
-        if (len == 0) {
-          _W("Done");
-          break;
-        }
-
-        if (!IsConnected())
-          continue;
-
-        int ret = port_->Write(reinterpret_cast<void*>(&len), sizeof(len));
-        if (ret < 0) {
-          _E("Failed to write size");
-          SetConnectionStatus(false);
-          continue;
-        }
-
-        ret = port_->Write(parcel->GetData(), len);
-        if (ret < 0) {
-          _E("Failed to write data");
-          SetConnectionStatus(false);
-        }
-      } while (true);
-      _W("END");
-    });
+    _W("START");
+    do {
+      std::shared_ptr<tizen_base::Parcel> parcel = queue_.WaitAndPop();
+      int len = parcel->GetDataSize();
+      if (len == 0) {
+        _W("Done");
+        break;
+      }
+
+      if (!IsConnected()) continue;
+
+      int ret = port_->Write(reinterpret_cast<void*>(&len), sizeof(len));
+      if (ret < 0) {
+        _E("Failed to write size");
+        SetConnectionStatus(false);
+        continue;
+      }
+
+      ret = port_->Write(parcel->GetData(), len);
+      if (ret < 0) {
+        _E("Failed to write data");
+        SetConnectionStatus(false);
+      }
+    } while (true);
+    _W("END");
+  });
 
   is_running_ = true;
 }
@@ -365,30 +337,24 @@ void DebugPortImpl::JoinThread() {
   }
 }
 
-
-std::recursive_mutex& DebugPortImpl::GetMutex() const {
-  return mutex_;
-}
+std::recursive_mutex& DebugPortImpl::GetMutex() const { return mutex_; }
 
 int DebugPortImpl::AppComCb(const char* endpoint, aul_app_com_result_e result,
-    bundle* envelope, void* user_data) {
+                            bundle* envelope, void* user_data) {
   const char* val = bundle_get_val(envelope, KEY_PORT_NAME);
-  if (val == nullptr)
-    return -1;
+  if (val == nullptr) return -1;
 
   auto* handle = static_cast<DebugPortImpl*>(user_data);
   std::string port_name(val);
   if (port_name.empty() || handle->FindSession(port_name) != nullptr) {
     auto* handle = static_cast<DebugPortImpl*>(user_data);
     int fd = handle->Connect();
-    if (fd < 0)
-      return -1;
+    if (fd < 0) return -1;
 
     std::lock_guard<std::recursive_mutex> lock(handle->GetMutex());
     handle->port_.reset(new Port(-1, fd, "Debug"));
     int ret = handle->Watch(fd);
-    if (ret < 0)
-      return -1;
+    if (ret < 0) return -1;
 
     handle->SetConnectionStatus(true);
     _W("Connected");
@@ -407,24 +373,23 @@ bool DebugPort::IsConnected() {
   return impl.IsConnected();
 }
 
-void DebugPort::AddSession(std::string port_name, std::string destination,
-                           int main_read_fd, int main_write_fd,
-                           int delegate_read_fd, int delegate_write_fd) {
+void DebugPort::AddSession(std::string port_name,
+                           std::shared_ptr<Port> main_port,
+                           std::shared_ptr<Port> delegate_port) {
   impl.Init();
-  return impl.AddSession(std::move(port_name), std::move(destination),
-                         main_read_fd, main_write_fd, delegate_read_fd,
-                         delegate_write_fd);
+  return impl.AddSession(std::move(port_name), std::move(main_port),
+                         std::move(delegate_port));
 }
 
 void DebugPort::RemoveSession(int port) {
   impl.Init();
-  return impl.RemoveSession(port);
+  impl.RemoveSession(port);
 }
 
-int DebugPort::Send(int port, bool is_read, uint32_t seq, const void* buf,
-    unsigned int size) {
+void DebugPort::Send(int port, bool is_read, uint32_t seq, const void* buf,
+                     unsigned int size) {
   impl.Init();
-  return impl.Send(port, is_read, seq, buf, size);
+  impl.Send(port, is_read, seq, buf, size);
 }
 
 }  // namespace internal
index 080fc4a2c36d061c0586e1f230a59337c3542a97..6b9023c519a81356f6c7cd3bcbcd6d0487480ed6 100644 (file)
@@ -19,6 +19,9 @@
 
 #include <cstdint>
 #include <string>
+#include <memory>
+
+#include "rpc-port/port-internal.hh"
 
 namespace rpc_port {
 namespace internal {
@@ -26,12 +29,11 @@ namespace internal {
 class DebugPort {
  public:
   static bool IsConnected();
-  static void AddSession(std::string port_name, std::string destination,
-                         int main_read_fd, int main_write_fd,
-                         int delegate_read_fd, int delegate_write_fd);
+  static void AddSession(std::string port_name, std::shared_ptr<Port> main_port,
+                         std::shared_ptr<Port> delegate_port);
   static void RemoveSession(int port);
-  static int Send(int port, bool is_read, uint32_t seq,
-      const void* buf, unsigned int size);
+  static void Send(int port, bool is_read, uint32_t seq, const void* buf,
+                   unsigned int size);
 };
 
 }  // namespace internal
index ac2f164ade47ea01619465b00993fbb60f8e6301..86e2969fdd0e97cb9b519413c9493333cb78d261 100644 (file)
@@ -132,16 +132,13 @@ int Proxy::Connect(bool sync) {
   if (ret != RPC_PORT_ERROR_NONE) return ret;
 
   if (sync) {
-    int main_read_fd = main_client_[0]->RemoveFd();
-    int main_write_fd = main_client_[1]->RemoveFd();
-    main_port_.reset(
-        new ProxyPort(main_read_fd, main_write_fd, target_appid_, this, false));
-    int delegate_read_fd = delegate_client_[0]->RemoveFd();
-    int delegate_write_fd = delegate_client_[1]->RemoveFd();
-    delegate_port_.reset(new ProxyPort(delegate_read_fd, delegate_write_fd,
+    main_port_.reset(new ProxyPort(main_client_[0]->RemoveFd(),
+                                   main_client_[1]->RemoveFd(), target_appid_,
+                                   this, false));
+    delegate_port_.reset(new ProxyPort(delegate_client_[0]->RemoveFd(),
+                                       delegate_client_[1]->RemoveFd(),
                                        target_appid_, this));
-    DebugPort::AddSession(port_name_, target_appid_, main_read_fd,
-                          main_write_fd, delegate_read_fd, delegate_write_fd);
+    DebugPort::AddSession(port_name_, main_port_, delegate_port_);
     listener_->OnConnected(target_appid_, main_port_.get());
   }
 
@@ -692,9 +689,7 @@ void Proxy::SetPort(int fd, bool is_read, bool is_delegate) {
        target_appid_.c_str(), port_name_.c_str(), main_port_->GetReadFd(),
        main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
        delegate_port_->GetWriteFd());
-    DebugPort::AddSession(port_name_, target_appid_, main_port_->GetReadFd(),
-                          main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
-                          delegate_port_->GetWriteFd());
+    DebugPort::AddSession(port_name_, main_port_, delegate_port_);
     listener_->OnConnected(target_appid_, main_port_.get());
   }
 }
index dc705f9da2534ea89dfc44dd97b7513575d50773..87dc01199d20edf1307ed250aee65ee8deed88f3 100644 (file)
@@ -289,9 +289,8 @@ void Stub::AddAcceptedPort(const std::string& sender_appid,
      sender_appid.c_str(), instance.c_str(), main_port->GetReadFd(),
      main_port->GetWriteFd(), delegate_port->GetReadFd(),
      delegate_port->GetWriteFd());
-  DebugPort::AddSession(port_name_, sender_appid, main_port->GetReadFd(),
-                        main_port->GetWriteFd(), delegate_port->GetReadFd(),
-                        delegate_port->GetWriteFd());
+  DebugPort::AddSession(port_name_, std::move(main_port),
+                        std::move(delegate_port));
   listener_->OnConnected(sender_appid, instance);
 }