Make thread safe codes 42/262642/3
authorHwankyu Jhun <h.jhun@samsung.com>
Tue, 17 Aug 2021 04:07:53 +0000 (13:07 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Tue, 17 Aug 2021 10:40:31 +0000 (19:40 +0900)
The rpc_port_stub_h handle destroys in the main thread. The rpc-port
releases the handle using g_idle_add_full(). Before, the idler is added,
the Ignore() method is called to set nullptr to the event listener.
While calling the callback function, the rpc-port checks whether
the event listener is nullptr or not. If it's nullptr, the callback function
will be returned immediately.

Change-Id: I7728552c38173c548c07ce52c1339c6da9c4b29b
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/rpc-port.cc
src/stub-internal.cc
src/stub-internal.hh

index 2ee572d..d02fc44 100644 (file)
@@ -140,7 +140,7 @@ class ProxyExt : public Proxy, public Proxy::IEventListener {
 
 class StubExt : public Stub, public Stub::IEventListener {
  public:
-  explicit StubExt(const std::string& port) : Stub(port) {}
+  explicit StubExt(const std::string& port) : Stub(port), destroying_(false) {}
   virtual ~StubExt() = default;
 
   void AddConnectedEventListener(rpc_port_stub_connected_event_cb cb,
@@ -163,6 +163,9 @@ class StubExt : public Stub, public Stub::IEventListener {
 
   void OnConnected(const std::string& sender,
                    const std::string& instance) override {
+    if (IsDestroying())
+      return;
+
     for (auto& ev : connected_events_) {
       ev->cb_(sender.c_str(), instance.c_str(), ev->user_data_);
     }
@@ -170,6 +173,9 @@ class StubExt : public Stub, public Stub::IEventListener {
 
   void OnDisconnected(const std::string& sender,
                       const std::string& instance) override {
+    if (IsDestroying())
+      return;
+
     for (auto& ev : disconnected_events_) {
       ev->cb_(sender.c_str(), instance.c_str(), ev->user_data_);
     }
@@ -177,6 +183,9 @@ class StubExt : public Stub, public Stub::IEventListener {
 
   int OnReceived(const std::string& sender,
                  const std::string& instance, Port* port) override {
+    if (IsDestroying())
+      return -1;
+
     for (auto& ev : received_events_) {
       int ret = ev->cb_(sender.c_str(), instance.c_str(), port,
           ev->user_data_);
@@ -187,11 +196,20 @@ class StubExt : public Stub, public Stub::IEventListener {
     return 0;
   }
 
+  void SetDestroying(bool destroying) {
+    destroying_ = destroying;
+  }
+
+  bool IsDestroying() {
+    return destroying_;
+  }
+
   std::recursive_mutex& GetMutex() const {
     return mutex_;
   }
 
  private:
+  std::atomic<bool> destroying_;
   std::list<std::unique_ptr<Event<rpc_port_stub_connected_event_cb>>>
       connected_events_;
   std::list<std::unique_ptr<Event<rpc_port_stub_disconnected_event_cb>>>
@@ -406,7 +424,16 @@ RPC_API int rpc_port_stub_destroy(rpc_port_stub_h h) {
   _W("rpc_port_stub_destroy(%p)", h);
   auto p = static_cast<::StubExt*>(h);
   aul_rpc_port_usr_destroy(p->GetPortName().c_str(), rpc_port_get_target_uid());
-  delete p;
+  p->Ignore();
+  p->SetDestroying(true);
+
+  g_idle_add_full(G_PRIORITY_HIGH,
+      [](gpointer data) -> gboolean {
+        auto p = static_cast<::StubExt*>(data);
+        _W("rpc_port_stub_destroy(%p)", p);
+        delete p;
+        return G_SOURCE_REMOVE;
+      }, h, nullptr);
   return RPC_PORT_ERROR_NONE;
 }
 
index 553a9a6..48fe4a3 100644 (file)
@@ -114,6 +114,11 @@ int Stub::Listen(IEventListener* ev, int fd) {
   return server_->Listen();
 }
 
+void Stub::Ignore() {
+  std::lock_guard<std::recursive_mutex> lock(GetMutex());
+  listener_ = nullptr;
+}
+
 void Stub::AddPrivilege(const std::string& privilege) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   access_controller_.AddPrivilege(privilege);
@@ -165,9 +170,9 @@ void Stub::RemoveAcceptedPorts(std::string instance) {
   }
 }
 
-gboolean Stub::OnDataReceived(GIOChannel* channel, GIOCondition cond,
-    gpointer user_data) {
-  Stub* stub = static_cast<Stub*>(user_data);
+gboolean Stub::AcceptedPort::OnDataReceived(GIOChannel* channel,
+    GIOCondition cond, gpointer user_data) {
+  auto* stub = static_cast<Stub*>(user_data);
   std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
   auto* listener = stub->listener_;
   if (listener == nullptr) {
@@ -184,7 +189,7 @@ gboolean Stub::OnDataReceived(GIOChannel* channel, GIOCondition cond,
         listener->OnDisconnected(p->GetId(), p->GetInstance());
         stub->RemoveAcceptedPorts(p->GetInstance());
         Aul::NotifyRpcFinished();
-        return G_SOURCE_REMOVE;
+        return G_SOURCE_CONTINUE;
       }
 
       int ret = stub->listener_->OnReceived(p->GetId(), p->GetInstance(),
@@ -194,7 +199,7 @@ gboolean Stub::OnDataReceived(GIOChannel* channel, GIOCondition cond,
         listener->OnDisconnected(p->GetId(), p->GetInstance());
         stub->RemoveAcceptedPorts(p->GetInstance());
         Aul::NotifyRpcFinished();
-        return G_SOURCE_REMOVE;
+        return G_SOURCE_CONTINUE;
       }
 
       break;
@@ -204,8 +209,8 @@ gboolean Stub::OnDataReceived(GIOChannel* channel, GIOCondition cond,
   return G_SOURCE_CONTINUE;
 }
 
-gboolean Stub::OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
-    gpointer user_data) {
+gboolean Stub::AcceptedPort::OnSocketDisconnected(GIOChannel* channel,
+    GIOCondition cond, gpointer user_data) {
   auto* stub = static_cast<Stub*>(user_data);
   std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
   auto* listener = stub->listener_;
@@ -277,11 +282,11 @@ Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate, int fd,
 }
 
 Stub::AcceptedPort::~AcceptedPort() {
-  if (disconn_src_ > 0)
-    g_source_remove(disconn_src_);
+  if (disconn_source_ > 0)
+    g_source_remove(disconn_source_);
 
-  if (src_ > 0)
-    g_source_remove(src_);
+  if (source_ > 0)
+    g_source_remove(source_);
 
   if (channel_ != nullptr)
     g_io_channel_unref(channel_);
@@ -294,10 +299,10 @@ int Stub::AcceptedPort::Watch(bool receive) {
     return -1;
   }
 
-  disconn_src_ = g_io_add_watch(channel_,
+  disconn_source_ = g_io_add_watch(channel_,
       static_cast<GIOCondition>(G_IO_ERR | G_IO_HUP | G_IO_NVAL),
-      Stub::OnSocketDisconnected, parent_);
-  if (disconn_src_ == 0) {
+      OnSocketDisconnected, parent_);
+  if (disconn_source_ == 0) {
     _E("g_io_add_watch() is failed");
     return -1;
   }
@@ -305,9 +310,9 @@ int Stub::AcceptedPort::Watch(bool receive) {
   if (!receive)
     return 0;
 
-  src_ = g_io_add_watch(channel_, static_cast<GIOCondition>(G_IO_IN),
-      Stub::OnDataReceived, parent_);
-  if (src_ == 0) {
+  source_ = g_io_add_watch(channel_, static_cast<GIOCondition>(G_IO_IN),
+      OnDataReceived, parent_);
+  if (source_ == 0) {
     _E("g_io_add_watch() is failed");
     return -1;
   }
@@ -347,9 +352,9 @@ int Stub::Server::Listen() {
 
 gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond,
     gpointer user_data) {
-  Stub* stub = static_cast<Stub*>(user_data);
+  auto* stub = static_cast<Stub*>(user_data);
   std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
-  if (stub->server_.get() == nullptr) {
+  if (stub->listener_ == nullptr) {
     _E("Invalid context");
     return G_SOURCE_REMOVE;
   }
index 28114cc..7e32d4d 100644 (file)
@@ -50,6 +50,7 @@ class Stub {
   virtual ~Stub();
 
   int Listen(IEventListener* ev, int fd);
+  void Ignore();
   void AddPrivilege(const std::string& privilege);
   void SetTrusted(const bool trusted);
   std::shared_ptr<Port> FindPort(const std::string& instance) const;
@@ -70,11 +71,15 @@ class Stub {
 
    private:
     int Watch(bool receive);
+    static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
+        gpointer user_data);
+    static gboolean OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
+        gpointer user_data);
 
    private:
     GIOChannel* channel_ = nullptr;
-    int disconn_src_ = 0;
-    int src_ = 0;
+    guint disconn_source_ = 0;
+    guint source_ = 0;
     Stub* parent_;
     bool is_delegate_ = false;
   };
@@ -96,10 +101,6 @@ class Stub {
     guint source_ = 0;;
   };
 
-  static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
-      gpointer user_data);
-  static gboolean OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
-      gpointer user_data);
   void AddAcceptedPort(const std::string& sender_appid,
       const std::string& instance, const std::string& port_type, int fd);
   void RemoveAcceptedPorts(std::string instance);