Add CynaraThread 68/290168/3
authorChanggyu Choi <changyu.choi@samsung.com>
Mon, 20 Mar 2023 06:23:39 +0000 (15:23 +0900)
committerChanggyu Choi <changyu.choi@samsung.com>
Tue, 21 Mar 2023 08:07:14 +0000 (17:07 +0900)
Some cynara apis cause blocking a thread.
This patch adds a sub thread for processing cynara requests.

Change-Id: Ib800d034ce0e545f18327b00f00ace729de8a8f2
Signed-off-by: Changgyu Choi <changyu.choi@samsung.com>
src/ac-internal.cc
src/ac-internal.hh
src/cynara_thread.cc [new file with mode: 0644]
src/cynara_thread.hh [new file with mode: 0644]
src/stub-internal.cc
src/stub-internal.hh

index 98addbb..8dbac48 100644 (file)
@@ -25,6 +25,7 @@
 #include "ac-internal.hh"
 #include "aul-internal.hh"
 #include "log-private.hh"
+#include "cynara_thread.hh"
 
 namespace rpc_port {
 namespace internal {
@@ -93,6 +94,28 @@ int AccessController::Check(int fd, const std::string& sender_appid) {
   return ret;
 }
 
+void AccessController::CheckAsync(int fd, std::string sender_appid,
+    CompleteCallback callback) {
+  Job job([=]() -> Job::Type {
+    int res = Check(fd, sender_appid);
+    auto* cbdata = new std::pair<CompleteCallback, int>(callback, res);
+    if (callback != nullptr) {
+      g_idle_add(
+          [](gpointer data) -> gboolean {
+            auto* cbdata = static_cast<std::pair<CompleteCallback, int>*>(data);
+            auto [callback, res] = *cbdata;
+            callback(res);
+            delete (cbdata);
+            return G_SOURCE_REMOVE;
+          }, cbdata);
+    }
+
+    return Job::Type::Continue;
+  });
+
+  CynaraThread::GetInst().Push(std::move(job));
+}
+
 AccessController::Cynara::Cynara()
     : cynara_(nullptr, cynara_finish), client_(nullptr, std::free),
     user_(nullptr, std::free) {
index 7b60c62..a2309ee 100644 (file)
@@ -22,6 +22,7 @@
 #include <glib-unix.h>
 #include <glib.h>
 
+#include <functional>
 #include <map>
 #include <memory>
 #include <string>
@@ -30,6 +31,8 @@
 namespace rpc_port {
 namespace internal {
 
+using CompleteCallback = std::function<void(int)>;
+
 class AccessController {
  public:
   explicit AccessController(bool trusted = false) : trusted_(trusted) {}
@@ -37,6 +40,7 @@ class AccessController {
   void AddPrivilege(std::string privilege);
   void SetTrusted(const bool trusted);
   int Check(int fd, const std::string& sender_appid);
+  void CheckAsync(int fd, std::string sender_appid, CompleteCallback callback);
 
  private:
   class Cynara {
diff --git a/src/cynara_thread.cc b/src/cynara_thread.cc
new file mode 100644 (file)
index 0000000..84554e8
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cynara_thread.hh"
+
+#include <utility>
+
+namespace rpc_port {
+namespace internal {
+
+Job::Job(Job::JobHandlerCallback cb) : cb_(std::move(cb)) {}
+
+Job::Job() : cb_([]() { return Job::Type::Finish; }) {}
+
+Job::Type Job::Do() {
+  return cb_();
+}
+
+CynaraThread& CynaraThread::GetInst() {
+  static CynaraThread* inst = new CynaraThread();
+  return *inst;
+}
+
+CynaraThread::CynaraThread() {
+  thread_ = std::thread([this]() { ThreadRun(); });
+}
+
+CynaraThread::~CynaraThread() {
+  Job finish_job;
+  Push(finish_job);
+  thread_.join();
+}
+
+void CynaraThread::ThreadRun() {
+  while (true) {
+    Job job;
+    queue_.WaitAndPop(job);
+    Job::Type ret = job.Do();
+    if (ret == Job::Type::Finish)
+      return;
+  }
+}
+
+void CynaraThread::Push(Job job) {
+  queue_.Push(std::move(job));
+}
+
+}  // namespace internal
+}  // namespace rpc_port
diff --git a/src/cynara_thread.hh b/src/cynara_thread.hh
new file mode 100644 (file)
index 0000000..827fbc3
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CYNARA_THREAD_HH_
+#define CYNARA_THREAD_HH_
+
+#include <functional>
+#include <thread>
+
+#include "shared-queue-internal.hh"
+
+namespace rpc_port {
+namespace internal {
+
+class Job {
+ public:
+  enum class Type {
+    Continue = 0,
+    Finish = 1,
+  };
+
+  using JobHandlerCallback = std::function<Type(void)>;
+
+  explicit Job(JobHandlerCallback cb);
+  Job();
+
+  ~Job() = default;
+
+  Type Do();
+
+ private:
+  JobHandlerCallback cb_;
+};
+
+class CynaraThread {
+ public:
+  static CynaraThread& GetInst();
+  CynaraThread(CynaraThread&) = delete;
+  CynaraThread& operator=(CynaraThread&) = delete;
+  ~CynaraThread();
+
+  void ThreadRun();
+  void Push(Job job);
+
+ private:
+  CynaraThread();
+  Job Pop();
+
+  std::thread thread_;
+  mutable SharedQueue<Job> queue_;
+};
+
+}  // namespace internal
+}  // namespace rpc_port
+
+#endif  // CYNARA_THREAD_HH_
index 5508bc4..72b59b2 100644 (file)
@@ -87,8 +87,11 @@ int SendResponse(ClientSocket* client, const Response& response) {
 
 }  // namespace
 
+std::unordered_set<Stub*> Stub::freed_stubs_;
+
 Stub::Stub(std::string port_name) : port_name_(std::move(port_name)) {
   _D("Stub::Stub()");
+  freed_stubs_.erase(this);
 }
 
 Stub::~Stub() {
@@ -101,6 +104,7 @@ Stub::~Stub() {
 
   listener_ = nullptr;
   server_.reset();
+  freed_stubs_.insert(this);
 }
 
 int Stub::Listen(IEventListener* ev, int fd) {
@@ -363,7 +367,7 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond,
     return G_SOURCE_REMOVE;
   }
 
-  std::unique_ptr<ClientSocket> client(stub->server_->Accept());
+  std::shared_ptr<ClientSocket> client(stub->server_->Accept());
   if (client.get() == nullptr) {
     _E("Out of memory");
     return G_SOURCE_CONTINUE;
@@ -374,21 +378,43 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond,
   if (ret != 0)
     return G_SOURCE_CONTINUE;
 
-  std::unique_ptr<Request> request_auto(request);
-  std::unique_ptr<PeerCred> cred(PeerCred::Get(client->GetFd()));
+  std::shared_ptr<Request> request_auto(request);
+  std::shared_ptr<PeerCred> cred(PeerCred::Get(client->GetFd()));
   if (cred.get() == nullptr) {
     _E("Failed to create peer credentials");
     return G_SOURCE_CONTINUE;
   }
 
   std::string app_id = Aul::GetAppId(cred->GetPid());
+  auto response_func = [=](int res) -> void {
+    if (freed_stubs_.find(stub) != freed_stubs_.end())
+      return;
+
+    Response response(res);
+    int ret = SendResponse(client.get(), response);
+    if (ret != 0)
+      return;
+
+    if (res != 0) {
+      _E("Access denied. fd(%d), pid(%d)", client->GetFd(), cred->GetPid());
+      return;
+    }
+
+    client->SetNonblock();
+    int client_fd = client->RemoveFd();
+    stub->AddAcceptedPort(app_id, request_auto->GetInstance(),
+        request_auto->GetPortType(), client_fd);
+  };
+
   int res;
   if (cred->GetUid() >= kRegularUidMin) {
     if (cred->GetUid() != getuid() && getuid() >= kRegularUidMin) {
       _E("Reject request. %u:%u", cred->GetUid(), getuid());
       res = -1;
     } else {
-      res = stub->access_controller_.Check(client->GetFd(), app_id);
+      stub->access_controller_.CheckAsync(client->GetFd(), app_id,
+          response_func);
+      return G_SOURCE_CONTINUE;
     }
   } else {
     _W("Bypass access control. pid(%d), uid(%u)",
@@ -396,20 +422,7 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond,
     res = 0;
   }
 
-  Response response(res);
-  ret = SendResponse(client.get(), response);
-  if (ret != 0)
-    return G_SOURCE_CONTINUE;
-
-  if (res != 0) {
-    _E("Access denied. fd(%d), pid(%d)", client->GetFd(), cred->GetPid());
-    return G_SOURCE_CONTINUE;
-  }
-
-  client->SetNonblock();
-  int client_fd = client->RemoveFd();
-  stub->AddAcceptedPort(app_id, request->GetInstance(), request->GetPortType(),
-      client_fd);
+  response_func(res);
   return G_SOURCE_CONTINUE;
 }
 
index 7e32d4d..0f2723c 100644 (file)
@@ -25,6 +25,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <unordered_set>
 
 #include "ac-internal.hh"
 #include "debug-port-internal.hh"
@@ -113,6 +114,7 @@ class Stub {
   IEventListener* listener_ = nullptr;
   std::unique_ptr<Server> server_;
   mutable std::recursive_mutex mutex_;
+  static std::unordered_set<Stub*> freed_stubs_;
 };
 
 }  // namespace internal