Remove TODOs
[platform/core/appfw/pkgmgr-info.git] / src / server / worker_thread.cc
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "worker_thread.hh"
18
19 #include <sqlite3.h>
20 #include <sys/types.h>
21 #include <malloc.h>
22
23 #include <tzplatform_config.h>
24
25 #include "abstract_parcelable.hh"
26 #include "request_handler_factory.hh"
27 #include "server/database/db_handle_provider.hh"
28 #include "utils/logging.hh"
29
30 #include "pkgmgrinfo_debug.h"
31
32 #ifdef LOG_TAG
33 #undef LOG_TAG
34 #endif
35 #define LOG_TAG "PKGMGR_INFO"
36
37 #ifndef SQLITE_ENABLE_MEMORY_MANAGEMENT
38 #define SQLITE_ENABLE_MEMORY_MANAGEMENT
39 #endif
40
41 namespace {
42
43 uid_t globaluser_uid = -1;
44
45 uid_t GetGlobalUID() {
46   if (globaluser_uid == (uid_t)-1)
47     globaluser_uid = tzplatform_getuid(TZ_SYS_GLOBALAPP_USER);
48
49   return globaluser_uid;
50 }
51
52 uid_t ConvertUID(uid_t uid) {
53   if (uid < REGULAR_USER)
54     return GetGlobalUID();
55   else
56     return uid;
57 }
58
59 }  // namespace
60
61 namespace pkgmgr_server {
62
63 WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
64   threads_.reserve(num);
65   for (unsigned int i = 0; i < num; ++i)
66     threads_.emplace_back([this]() -> void { this->Run(); });
67
68   LOG(DEBUG) << num << " Worker threads are created";
69 }
70
71 WorkerThread::~WorkerThread() {
72   stop_all_ = true;
73   cv_.notify_all();
74
75   for (auto& t : threads_)
76     t.join();
77 }
78
79 bool WorkerThread::PushQueue(std::shared_ptr<PkgRequest> req) {
80   {
81     std::unique_lock<std::mutex> u(lock_);
82     queue_.push(req);
83     cv_.notify_one();
84   }
85   return true;
86 }
87
88 std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
89   SetMemoryTrimTimer();
90   std::unique_lock<std::mutex> u(lock_);
91   cv_.wait(u, [this] { return !this->queue_.empty() || stop_all_; });
92   if (stop_all_ && queue_.empty())
93     return nullptr;
94
95   auto req = queue_.front();
96   queue_.pop();
97   return req;
98 }
99
100 void WorkerThread::Run() {
101   RequestHandlerFactory factory;
102   LOG(DEBUG) << "Initialize request handlers";
103   while (true) {
104     std::shared_ptr<PkgRequest> req = PopQueue();
105     if (req == nullptr)
106       return;
107
108     auto type = req->GetRequestType();
109     LOG(WARNING) << "Request type: " << pkgmgr_common::ReqTypeToString(type)
110                  << " pid: " << req->GetSenderPID();
111     auto handler = factory.GetRequestHandler(type);
112     if (handler == nullptr)
113       continue;
114
115     try {
116       handler->PreExec();
117       handler->SetUID(ConvertUID(req->GetSenderUID()));
118       handler->SetPID(req->GetSenderPID());
119       if (!handler->HandleRequest(req->GetData(), req->GetSize(),
120           locale_.GetObject()))
121         LOG(ERROR) << "Failed to handle request";
122
123       std::vector<uint8_t> result_data = handler->ExtractResult();
124       if (req->SendData(result_data.data(), result_data.size()) == false)
125         LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
126       else
127         LOG(WARNING) << "Success response pid: " << req->GetSenderPID();
128     } catch (const std::exception& err) {
129       LOG(ERROR) << "Exception occurred: " << err.what()
130                  << ", pid: " << req->GetSenderPID();
131       SendError(req);
132     } catch (...) {
133       LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
134       SendError(req);
135     }
136
137     handler->PostExec();
138   }
139 }
140
141 void WorkerThread::SetMemoryTrimTimer() {
142   std::lock_guard<std::recursive_mutex> lock(mutex_);
143   if (timer_ > 0)
144     g_source_remove(timer_);
145
146   timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
147       TrimMemory, this, NULL);
148 }
149
150 gboolean WorkerThread::TrimMemory(void* data) {
151   LOG(DEBUG) << "Trim memory";
152   auto* h = static_cast<WorkerThread*>(data);
153   {
154     std::lock_guard<std::recursive_mutex> lock(h->mutex_);
155     h->timer_ = 0;
156   }
157
158   if (database::DBHandleProvider::IsCrashedWriteRequest())
159     database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
160
161   sqlite3_release_memory(-1);
162   malloc_trim(0);
163   return G_SOURCE_REMOVE;
164 }
165
166 void WorkerThread::SetLocale(std::string locale) {
167   LOG(DEBUG) << "Change locale : " << locale_.GetObject()
168       << " -> "  << locale;
169   locale_.SetObject(std::move(locale));
170 }
171
172 void WorkerThread::SendError(const std::shared_ptr<PkgRequest>& req) {
173   pkgmgr_common::parcel::AbstractParcelable parcelable(
174       0, pkgmgr_common::parcel::ParcelableType::Unknown, PMINFO_R_ERROR);
175   tizen_base::Parcel p;
176   p.WriteParcelable(parcelable);
177   std::vector<uint8_t> raw = p.GetRaw();
178   req->SendData(&raw[0], raw.size());
179 }
180
181 }  // namespace pkgmgr_server