Change scheduling policy to a round-robin policy
[platform/core/appfw/pkgmgr-info.git] / src / server / worker_thread.cc
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "worker_thread.hh"
18
19 #include <sqlite3.h>
20 #include <malloc.h>
21
22 #include "abstract_parcelable.hh"
23 #include "command_request_handler.hh"
24 #include "create_cache_request_handler.hh"
25 #include "create_db_request_handler.hh"
26 #include "db_handle_provider.hh"
27 #include "get_appinfo_request_handler.hh"
28 #include "get_cert_request_handler.hh"
29 #include "get_depinfo_request_handler.hh"
30 #include "get_pkginfo_request_handler.hh"
31 #include "query_request_handler.hh"
32 #include "set_cert_request_handler.hh"
33 #include "set_pkginfo_request_handler.hh"
34 #include "utils/logging.hh"
35
36 #include "pkgmgrinfo_debug.h"
37
38 #ifdef LOG_TAG
39 #undef LOG_TAG
40 #endif
41 #define LOG_TAG "PKGMGR_INFO"
42
43 #ifndef SQLITE_ENABLE_MEMORY_MANAGEMENT
44 #define SQLITE_ENABLE_MEMORY_MANAGEMENT
45 #endif
46
47 namespace pkgmgr_server {
48
49 WorkerThread::Scheduler::Scheduler(pid_t tid) : tid_(tid) {
50   Get(&policy_, &param_);
51 }
52
53 void WorkerThread::Scheduler::ChangePolicy() {
54   struct sched_param param = { 0, };
55   param.sched_priority = 1;
56   if (Set(SCHED_RR, &param) != 0)
57     return;
58
59   int policy = -1;
60   Get(&policy, &param);
61   LOG(WARNING) << "Current policy: " << policy << ", priority: "
62                << param.sched_priority;
63 }
64
65 void WorkerThread::Scheduler::ResetPolicy() {
66   if (Set(policy_, &param_) != 0)
67     return;
68
69   struct sched_param param = { 0, };
70   int policy = -1;
71   Get(&policy, &param);
72   LOG(WARNING) << "Current policy: " << policy << ", priority: "
73                << param.sched_priority;
74 }
75
76 void WorkerThread::Scheduler::Get(int* policy, struct sched_param* param) {
77   if (sched_getparam(tid_, param) != 0)
78     LOG(ERROR) << "sched_getparam() is failed. errno: " << errno;
79
80   *policy = sched_getscheduler(tid_);
81   if (*policy < 0)
82     LOG(ERROR) << "sched_getscheduler() is failed. errno: " << errno;
83 }
84
85 int WorkerThread::Scheduler::Set(int policy, struct sched_param* param) {
86   if (sched_setscheduler(tid_, policy, param) != 0) {
87     LOG(ERROR) << "sched_setscheduler() is failed. policy: " << policy
88                << ", errno: " << errno;
89     return -1;
90   }
91
92   LOG(WARNING) << "policy: " << policy << ", sched_priority: "
93                << param->sched_priority;
94   return 0;
95 }
96
97 WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
98   threads_.reserve(num);
99   for (unsigned int i = 0; i < num; ++i)
100     threads_.emplace_back([this]() -> void { this->Run(); });
101
102   LOG(DEBUG) << num << " Worker threads are created";
103 }
104
105 WorkerThread::~WorkerThread() {
106   stop_all_ = true;
107   cv_.notify_all();
108
109   for (auto& t : threads_)
110     t.join();
111 }
112
113 bool WorkerThread::PushQueue(std::shared_ptr<PkgRequest> req) {
114   {
115     std::unique_lock<std::mutex> u(lock_);
116     queue_.push(req);
117     cv_.notify_one();
118   }
119   return true;
120 }
121
122 void WorkerThread::Run() {
123   std::unique_ptr<request_handler::AbstractRequestHandler>
124       handler[pkgmgr_common::ReqType::MAX];
125   handler[pkgmgr_common::ReqType::GET_PKG_INFO].reset(
126       new request_handler::GetPkginfoRequestHandler());
127   handler[pkgmgr_common::ReqType::GET_APP_INFO].reset(
128       new request_handler::GetAppinfoRequestHandler());
129   handler[pkgmgr_common::ReqType::SET_PKG_INFO].reset(
130       new request_handler::SetPkginfoRequestHandler());
131   handler[pkgmgr_common::ReqType::SET_CERT_INFO].reset(
132       new request_handler::SetCertRequestHandler());
133   handler[pkgmgr_common::ReqType::GET_CERT_INFO].reset(
134       new request_handler::GetCertRequestHandler());
135   handler[pkgmgr_common::ReqType::GET_PKG_DEP_INFO].reset(
136       new request_handler::GetDepinfoRequestHandler());
137   handler[pkgmgr_common::ReqType::QUERY].reset(
138       new request_handler::QueryRequestHandler());
139   handler[pkgmgr_common::ReqType::COMMAND].reset(
140       new request_handler::CommandRequestHandler());
141   handler[pkgmgr_common::ReqType::CREATE_DB].reset(
142       new request_handler::CreateDBRequestHandler());
143   handler[pkgmgr_common::ReqType::CREATE_CACHE].reset(
144       new request_handler::CreateCacheRequestHandler());
145
146   std::unique_ptr<WorkerThread::Scheduler> scheduler(
147       new WorkerThread::Scheduler(gettid()));
148
149   LOG(DEBUG) << "Initialize request handlers";
150   while (true) {
151     std::shared_ptr<PkgRequest> req;
152     {
153       std::unique_lock<std::mutex> u(lock_);
154       cv_.wait(u, [this] { return !this->queue_.empty() || stop_all_; });
155       if (stop_all_ && queue_.empty())
156         return;
157       req = PopQueue();
158     }
159
160     pkgmgr_common::ReqType type = req->GetRequestType();
161     LOG(WARNING) << "Request type: " << pkgmgr_common::ReqTypeToString(type)
162         << " pid: " << req->GetSenderPID();
163     if (type <= pkgmgr_common::ReqType::REQ_TYPE_NONE
164             || type >= pkgmgr_common::ReqType::MAX) {
165       LOG(ERROR) << "Request type is invalid: " << static_cast<int>(type)
166           << ",  pid:" << req->GetSenderPID();
167       SendError(req);
168
169       continue;
170     }
171
172     try {
173       if (type == pkgmgr_common::ReqType::CREATE_CACHE)
174         scheduler->ChangePolicy();
175
176       handler[type]->SetPID(req->GetSenderPID());
177       if (!handler[type]->HandleRequest(req->GetData(), req->GetSize(),
178                                         locale_.GetObject()))
179         LOG(ERROR) << "Failed to handle request";
180     } catch (const std::exception& err) {
181       LOG(ERROR) << "Exception occurred: " << err.what()
182           << ", pid: " << req->GetSenderPID();
183       SendError(req);
184       if (type == pkgmgr_common::ReqType::CREATE_CACHE)
185         scheduler->ResetPolicy();
186
187       continue;
188     } catch (...) {
189       LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
190       SendError(req);
191       if (type == pkgmgr_common::ReqType::CREATE_CACHE)
192         scheduler->ResetPolicy();
193
194       continue;
195     }
196
197     std::vector<uint8_t> result_data = handler[type]->ExtractResult();
198     if (req->SendData(result_data.data(), result_data.size()) == false) {
199       LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
200       continue;
201     }
202
203     if (type == pkgmgr_common::ReqType::CREATE_CACHE)
204       scheduler->ResetPolicy();
205
206     LOG(WARNING) << "Success response pid: " <<  req->GetSenderPID();
207   }
208 }
209
210 void WorkerThread::SetMemoryTrimTimer() {
211   std::lock_guard<std::recursive_mutex> lock(mutex_);
212   if (timer_ > 0)
213     g_source_remove(timer_);
214
215   timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
216       TrimMemory, this, NULL);
217 }
218
219 gboolean WorkerThread::TrimMemory(void* data) {
220   LOG(DEBUG) << "Trim memory";
221   auto* h = static_cast<WorkerThread*>(data);
222   {
223     std::lock_guard<std::recursive_mutex> lock(h->mutex_);
224     h->timer_ = 0;
225   }
226
227   if (database::DBHandleProvider::IsCrashedWriteRequest())
228     database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
229
230   sqlite3_release_memory(-1);
231   malloc_trim(0);
232   return G_SOURCE_REMOVE;
233 }
234
235 std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
236   SetMemoryTrimTimer();
237   auto req = queue_.front();
238   queue_.pop();
239   return req;
240 }
241
242 void WorkerThread::SetLocale(std::string locale) {
243   LOG(DEBUG) << "Change locale : " << locale_.GetObject()
244       << " -> "  << locale;
245   locale_.SetObject(std::move(locale));
246 }
247
248 void WorkerThread::SendError(std::shared_ptr<PkgRequest> req) {
249   pkgmgr_common::parcel::AbstractParcelable parcelable(
250       0, pkgmgr_common::parcel::ParcelableType::Unknown, PMINFO_R_ERROR);
251   tizen_base::Parcel p;
252   p.WriteParcelable(parcelable);
253   std::vector<uint8_t> raw = p.GetRaw();
254   req->SendData(&raw[0], raw.size());
255 }
256
257 }  // namespace pkgmgr_server