2 * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "worker_thread.hh"
22 #include "abstract_parcelable.hh"
23 #include "command_request_handler.hh"
24 #include "create_cache_request_handler.hh"
25 #include "create_db_request_handler.hh"
26 #include "db_handle_provider.hh"
27 #include "get_appinfo_request_handler.hh"
28 #include "get_cert_request_handler.hh"
29 #include "get_depinfo_request_handler.hh"
30 #include "get_pkginfo_request_handler.hh"
31 #include "query_request_handler.hh"
32 #include "set_cert_request_handler.hh"
33 #include "set_pkginfo_request_handler.hh"
34 #include "utils/logging.hh"
36 #include "pkgmgrinfo_debug.h"
41 #define LOG_TAG "PKGMGR_INFO"
43 #ifndef SQLITE_ENABLE_MEMORY_MANAGEMENT
44 #define SQLITE_ENABLE_MEMORY_MANAGEMENT
47 namespace pkgmgr_server {
49 WorkerThread::Scheduler::Scheduler(pid_t tid) : tid_(tid) {
50 Get(&policy_, ¶m_);
53 void WorkerThread::Scheduler::ChangePolicy() {
54 struct sched_param param = { 0, };
55 param.sched_priority = 1;
56 if (Set(SCHED_RR, ¶m) != 0)
61 LOG(WARNING) << "Current policy: " << policy << ", priority: "
62 << param.sched_priority;
65 void WorkerThread::Scheduler::ResetPolicy() {
66 if (Set(policy_, ¶m_) != 0)
69 struct sched_param param = { 0, };
72 LOG(WARNING) << "Current policy: " << policy << ", priority: "
73 << param.sched_priority;
76 void WorkerThread::Scheduler::Get(int* policy, struct sched_param* param) {
77 if (sched_getparam(tid_, param) != 0)
78 LOG(ERROR) << "sched_getparam() is failed. errno: " << errno;
80 *policy = sched_getscheduler(tid_);
82 LOG(ERROR) << "sched_getscheduler() is failed. errno: " << errno;
85 int WorkerThread::Scheduler::Set(int policy, struct sched_param* param) {
86 if (sched_setscheduler(tid_, policy, param) != 0) {
87 LOG(ERROR) << "sched_setscheduler() is failed. policy: " << policy
88 << ", errno: " << errno;
92 LOG(WARNING) << "policy: " << policy << ", sched_priority: "
93 << param->sched_priority;
97 WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
98 threads_.reserve(num);
99 for (unsigned int i = 0; i < num; ++i)
100 threads_.emplace_back([this]() -> void { this->Run(); });
102 LOG(DEBUG) << num << " Worker threads are created";
105 WorkerThread::~WorkerThread() {
109 for (auto& t : threads_)
113 bool WorkerThread::PushQueue(std::shared_ptr<PkgRequest> req) {
115 std::unique_lock<std::mutex> u(lock_);
122 void WorkerThread::Run() {
123 std::unique_ptr<request_handler::AbstractRequestHandler>
124 handler[pkgmgr_common::ReqType::MAX];
125 handler[pkgmgr_common::ReqType::GET_PKG_INFO].reset(
126 new request_handler::GetPkginfoRequestHandler());
127 handler[pkgmgr_common::ReqType::GET_APP_INFO].reset(
128 new request_handler::GetAppinfoRequestHandler());
129 handler[pkgmgr_common::ReqType::SET_PKG_INFO].reset(
130 new request_handler::SetPkginfoRequestHandler());
131 handler[pkgmgr_common::ReqType::SET_CERT_INFO].reset(
132 new request_handler::SetCertRequestHandler());
133 handler[pkgmgr_common::ReqType::GET_CERT_INFO].reset(
134 new request_handler::GetCertRequestHandler());
135 handler[pkgmgr_common::ReqType::GET_PKG_DEP_INFO].reset(
136 new request_handler::GetDepinfoRequestHandler());
137 handler[pkgmgr_common::ReqType::QUERY].reset(
138 new request_handler::QueryRequestHandler());
139 handler[pkgmgr_common::ReqType::COMMAND].reset(
140 new request_handler::CommandRequestHandler());
141 handler[pkgmgr_common::ReqType::CREATE_DB].reset(
142 new request_handler::CreateDBRequestHandler());
143 handler[pkgmgr_common::ReqType::CREATE_CACHE].reset(
144 new request_handler::CreateCacheRequestHandler());
146 std::unique_ptr<WorkerThread::Scheduler> scheduler(
147 new WorkerThread::Scheduler(gettid()));
149 LOG(DEBUG) << "Initialize request handlers";
151 std::shared_ptr<PkgRequest> req;
153 std::unique_lock<std::mutex> u(lock_);
154 cv_.wait(u, [this] { return !this->queue_.empty() || stop_all_; });
155 if (stop_all_ && queue_.empty())
160 pkgmgr_common::ReqType type = req->GetRequestType();
161 LOG(WARNING) << "Request type: " << pkgmgr_common::ReqTypeToString(type)
162 << " pid: " << req->GetSenderPID();
163 if (type <= pkgmgr_common::ReqType::REQ_TYPE_NONE
164 || type >= pkgmgr_common::ReqType::MAX) {
165 LOG(ERROR) << "Request type is invalid: " << static_cast<int>(type)
166 << ", pid:" << req->GetSenderPID();
173 if (type == pkgmgr_common::ReqType::CREATE_CACHE)
174 scheduler->ChangePolicy();
176 handler[type]->SetPID(req->GetSenderPID());
177 if (!handler[type]->HandleRequest(req->GetData(), req->GetSize(),
178 locale_.GetObject()))
179 LOG(ERROR) << "Failed to handle request";
180 } catch (const std::exception& err) {
181 LOG(ERROR) << "Exception occurred: " << err.what()
182 << ", pid: " << req->GetSenderPID();
184 if (type == pkgmgr_common::ReqType::CREATE_CACHE)
185 scheduler->ResetPolicy();
189 LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
191 if (type == pkgmgr_common::ReqType::CREATE_CACHE)
192 scheduler->ResetPolicy();
197 std::vector<uint8_t> result_data = handler[type]->ExtractResult();
198 if (req->SendData(result_data.data(), result_data.size()) == false) {
199 LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
203 if (type == pkgmgr_common::ReqType::CREATE_CACHE)
204 scheduler->ResetPolicy();
206 LOG(WARNING) << "Success response pid: " << req->GetSenderPID();
210 void WorkerThread::SetMemoryTrimTimer() {
211 std::lock_guard<std::recursive_mutex> lock(mutex_);
213 g_source_remove(timer_);
215 timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
216 TrimMemory, this, NULL);
219 gboolean WorkerThread::TrimMemory(void* data) {
220 LOG(DEBUG) << "Trim memory";
221 auto* h = static_cast<WorkerThread*>(data);
223 std::lock_guard<std::recursive_mutex> lock(h->mutex_);
227 if (database::DBHandleProvider::IsCrashedWriteRequest())
228 database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
230 sqlite3_release_memory(-1);
232 return G_SOURCE_REMOVE;
235 std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
236 SetMemoryTrimTimer();
237 auto req = queue_.front();
242 void WorkerThread::SetLocale(std::string locale) {
243 LOG(DEBUG) << "Change locale : " << locale_.GetObject()
245 locale_.SetObject(std::move(locale));
248 void WorkerThread::SendError(std::shared_ptr<PkgRequest> req) {
249 pkgmgr_common::parcel::AbstractParcelable parcelable(
250 0, pkgmgr_common::parcel::ParcelableType::Unknown, PMINFO_R_ERROR);
251 tizen_base::Parcel p;
252 p.WriteParcelable(parcelable);
253 std::vector<uint8_t> raw = p.GetRaw();
254 req->SendData(&raw[0], raw.size());
257 } // namespace pkgmgr_server