From 4609ea75adb939dddf21eee3e034cabd68c21781 Mon Sep 17 00:00:00 2001 From: Changgyu Choi Date: Mon, 20 Mar 2023 15:23:39 +0900 Subject: [PATCH] Add CynaraThread Some cynara apis cause blocking a thread. This patch adds a sub thread for processing cynara requests. Change-Id: Ib800d034ce0e545f18327b00f00ace729de8a8f2 Signed-off-by: Changgyu Choi --- src/ac-internal.cc | 23 ++++++++++++++++++ src/ac-internal.hh | 4 +++ src/cynara_thread.cc | 62 ++++++++++++++++++++++++++++++++++++++++++++++ src/cynara_thread.hh | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/stub-internal.cc | 49 +++++++++++++++++++++++-------------- src/stub-internal.hh | 2 ++ 6 files changed, 191 insertions(+), 18 deletions(-) create mode 100644 src/cynara_thread.cc create mode 100644 src/cynara_thread.hh diff --git a/src/ac-internal.cc b/src/ac-internal.cc index 98addbb..8dbac48 100644 --- a/src/ac-internal.cc +++ b/src/ac-internal.cc @@ -25,6 +25,7 @@ #include "ac-internal.hh" #include "aul-internal.hh" #include "log-private.hh" +#include "cynara_thread.hh" namespace rpc_port { namespace internal { @@ -93,6 +94,28 @@ int AccessController::Check(int fd, const std::string& sender_appid) { return ret; } +void AccessController::CheckAsync(int fd, std::string sender_appid, + CompleteCallback callback) { + Job job([=]() -> Job::Type { + int res = Check(fd, sender_appid); + auto* cbdata = new std::pair(callback, res); + if (callback != nullptr) { + g_idle_add( + [](gpointer data) -> gboolean { + auto* cbdata = static_cast*>(data); + auto [callback, res] = *cbdata; + callback(res); + delete (cbdata); + return G_SOURCE_REMOVE; + }, cbdata); + } + + return Job::Type::Continue; + }); + + CynaraThread::GetInst().Push(std::move(job)); +} + AccessController::Cynara::Cynara() : cynara_(nullptr, cynara_finish), client_(nullptr, std::free), user_(nullptr, std::free) { diff --git a/src/ac-internal.hh b/src/ac-internal.hh index 7b60c62..a2309ee 100644 --- a/src/ac-internal.hh +++ b/src/ac-internal.hh @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,8 @@ namespace rpc_port { namespace internal { +using CompleteCallback = std::function; + class AccessController { public: explicit AccessController(bool trusted = false) : trusted_(trusted) {} @@ -37,6 +40,7 @@ class AccessController { void AddPrivilege(std::string privilege); void SetTrusted(const bool trusted); int Check(int fd, const std::string& sender_appid); + void CheckAsync(int fd, std::string sender_appid, CompleteCallback callback); private: class Cynara { diff --git a/src/cynara_thread.cc b/src/cynara_thread.cc new file mode 100644 index 0000000..84554e8 --- /dev/null +++ b/src/cynara_thread.cc @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cynara_thread.hh" + +#include + +namespace rpc_port { +namespace internal { + +Job::Job(Job::JobHandlerCallback cb) : cb_(std::move(cb)) {} + +Job::Job() : cb_([]() { return Job::Type::Finish; }) {} + +Job::Type Job::Do() { + return cb_(); +} + +CynaraThread& CynaraThread::GetInst() { + static CynaraThread* inst = new CynaraThread(); + return *inst; +} + +CynaraThread::CynaraThread() { + thread_ = std::thread([this]() { ThreadRun(); }); +} + +CynaraThread::~CynaraThread() { + Job finish_job; + Push(finish_job); + thread_.join(); +} + +void CynaraThread::ThreadRun() { + while (true) { + Job job; + queue_.WaitAndPop(job); + Job::Type ret = job.Do(); + if (ret == Job::Type::Finish) + return; + } +} + +void CynaraThread::Push(Job job) { + queue_.Push(std::move(job)); +} + +} // namespace internal +} // namespace rpc_port diff --git a/src/cynara_thread.hh b/src/cynara_thread.hh new file mode 100644 index 0000000..827fbc3 --- /dev/null +++ b/src/cynara_thread.hh @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef CYNARA_THREAD_HH_ +#define CYNARA_THREAD_HH_ + +#include +#include + +#include "shared-queue-internal.hh" + +namespace rpc_port { +namespace internal { + +class Job { + public: + enum class Type { + Continue = 0, + Finish = 1, + }; + + using JobHandlerCallback = std::function; + + explicit Job(JobHandlerCallback cb); + Job(); + + ~Job() = default; + + Type Do(); + + private: + JobHandlerCallback cb_; +}; + +class CynaraThread { + public: + static CynaraThread& GetInst(); + CynaraThread(CynaraThread&) = delete; + CynaraThread& operator=(CynaraThread&) = delete; + ~CynaraThread(); + + void ThreadRun(); + void Push(Job job); + + private: + CynaraThread(); + Job Pop(); + + std::thread thread_; + mutable SharedQueue queue_; +}; + +} // namespace internal +} // namespace rpc_port + +#endif // CYNARA_THREAD_HH_ diff --git a/src/stub-internal.cc b/src/stub-internal.cc index 5508bc4..72b59b2 100644 --- a/src/stub-internal.cc +++ b/src/stub-internal.cc @@ -87,8 +87,11 @@ int SendResponse(ClientSocket* client, const Response& response) { } // namespace +std::unordered_set Stub::freed_stubs_; + Stub::Stub(std::string port_name) : port_name_(std::move(port_name)) { _D("Stub::Stub()"); + freed_stubs_.erase(this); } Stub::~Stub() { @@ -101,6 +104,7 @@ Stub::~Stub() { listener_ = nullptr; server_.reset(); + freed_stubs_.insert(this); } int Stub::Listen(IEventListener* ev, int fd) { @@ -363,7 +367,7 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond, return G_SOURCE_REMOVE; } - std::unique_ptr client(stub->server_->Accept()); + std::shared_ptr client(stub->server_->Accept()); if (client.get() == nullptr) { _E("Out of memory"); return G_SOURCE_CONTINUE; @@ -374,21 +378,43 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond, if (ret != 0) return G_SOURCE_CONTINUE; - std::unique_ptr request_auto(request); - std::unique_ptr cred(PeerCred::Get(client->GetFd())); + std::shared_ptr request_auto(request); + std::shared_ptr cred(PeerCred::Get(client->GetFd())); if (cred.get() == nullptr) { _E("Failed to create peer credentials"); return G_SOURCE_CONTINUE; } std::string app_id = Aul::GetAppId(cred->GetPid()); + auto response_func = [=](int res) -> void { + if (freed_stubs_.find(stub) != freed_stubs_.end()) + return; + + Response response(res); + int ret = SendResponse(client.get(), response); + if (ret != 0) + return; + + if (res != 0) { + _E("Access denied. fd(%d), pid(%d)", client->GetFd(), cred->GetPid()); + return; + } + + client->SetNonblock(); + int client_fd = client->RemoveFd(); + stub->AddAcceptedPort(app_id, request_auto->GetInstance(), + request_auto->GetPortType(), client_fd); + }; + int res; if (cred->GetUid() >= kRegularUidMin) { if (cred->GetUid() != getuid() && getuid() >= kRegularUidMin) { _E("Reject request. %u:%u", cred->GetUid(), getuid()); res = -1; } else { - res = stub->access_controller_.Check(client->GetFd(), app_id); + stub->access_controller_.CheckAsync(client->GetFd(), app_id, + response_func); + return G_SOURCE_CONTINUE; } } else { _W("Bypass access control. pid(%d), uid(%u)", @@ -396,20 +422,7 @@ gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond, res = 0; } - Response response(res); - ret = SendResponse(client.get(), response); - if (ret != 0) - return G_SOURCE_CONTINUE; - - if (res != 0) { - _E("Access denied. fd(%d), pid(%d)", client->GetFd(), cred->GetPid()); - return G_SOURCE_CONTINUE; - } - - client->SetNonblock(); - int client_fd = client->RemoveFd(); - stub->AddAcceptedPort(app_id, request->GetInstance(), request->GetPortType(), - client_fd); + response_func(res); return G_SOURCE_CONTINUE; } diff --git a/src/stub-internal.hh b/src/stub-internal.hh index 7e32d4d..0f2723c 100644 --- a/src/stub-internal.hh +++ b/src/stub-internal.hh @@ -25,6 +25,7 @@ #include #include #include +#include #include "ac-internal.hh" #include "debug-port-internal.hh" @@ -113,6 +114,7 @@ class Stub { IEventListener* listener_ = nullptr; std::unique_ptr server_; mutable std::recursive_mutex mutex_; + static std::unordered_set freed_stubs_; }; } // namespace internal -- 2.7.4