Merge "Refactor WorkerThread" into tizen
[platform/core/appfw/pkgmgr-info.git] / src / server / worker_thread.cc
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "worker_thread.hh"
18
19 #include <sqlite3.h>
20 #include <malloc.h>
21
22 #include "abstract_parcelable.hh"
23 #include "request_handler_factory.hh"
24 #include "server/database/db_handle_provider.hh"
25 #include "utils/logging.hh"
26
27 #include "pkgmgrinfo_debug.h"
28
29 #ifdef LOG_TAG
30 #undef LOG_TAG
31 #endif
32 #define LOG_TAG "PKGMGR_INFO"
33
34 #ifndef SQLITE_ENABLE_MEMORY_MANAGEMENT
35 #define SQLITE_ENABLE_MEMORY_MANAGEMENT
36 #endif
37
38 namespace pkgmgr_server {
39
40 WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
41   threads_.reserve(num);
42   for (unsigned int i = 0; i < num; ++i)
43     threads_.emplace_back([this]() -> void { this->Run(); });
44
45   LOG(DEBUG) << num << " Worker threads are created";
46 }
47
48 WorkerThread::~WorkerThread() {
49   stop_all_ = true;
50   cv_.notify_all();
51
52   for (auto& t : threads_)
53     t.join();
54 }
55
56 bool WorkerThread::PushQueue(std::shared_ptr<PkgRequest> req) {
57   {
58     std::unique_lock<std::mutex> u(lock_);
59     queue_.push(req);
60     cv_.notify_one();
61   }
62   return true;
63 }
64
65 std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
66   SetMemoryTrimTimer();
67   std::unique_lock<std::mutex> u(lock_);
68   cv_.wait(u, [this] { return !this->queue_.empty() || stop_all_; });
69   if (stop_all_ && queue_.empty())
70     return nullptr;
71
72   auto req = queue_.front();
73   queue_.pop();
74   return req;
75 }
76
77 void WorkerThread::Run() {
78   RequestHandlerFactory factory;
79   LOG(DEBUG) << "Initialize request handlers";
80   while (true) {
81     std::shared_ptr<PkgRequest> req = PopQueue();
82     if (req == nullptr)
83       return;
84
85     auto type = req->GetRequestType();
86     LOG(WARNING) << "Request type: " << pkgmgr_common::ReqTypeToString(type)
87                  << " pid: " << req->GetSenderPID();
88     auto handler = factory.GetRequestHandler(type);
89     if (handler == nullptr)
90       continue;
91
92     try {
93       handler->PreExec();
94
95       handler->SetPID(req->GetSenderPID());
96       if (!handler->HandleRequest(req->GetData(), req->GetSize(),
97           locale_.GetObject()))
98         LOG(ERROR) << "Failed to handle request";
99
100       std::vector<uint8_t> result_data = handler->ExtractResult();
101       if (req->SendData(result_data.data(), result_data.size()) == false)
102         LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
103       else
104         LOG(WARNING) << "Success response pid: " << req->GetSenderPID();
105     } catch (const std::exception& err) {
106       LOG(ERROR) << "Exception occurred: " << err.what()
107                  << ", pid: " << req->GetSenderPID();
108       SendError(req);
109     } catch (...) {
110       LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
111       SendError(req);
112     }
113
114     handler->PostExec();
115   }
116 }
117
118 void WorkerThread::SetMemoryTrimTimer() {
119   std::lock_guard<std::recursive_mutex> lock(mutex_);
120   if (timer_ > 0)
121     g_source_remove(timer_);
122
123   timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
124       TrimMemory, this, NULL);
125 }
126
127 gboolean WorkerThread::TrimMemory(void* data) {
128   LOG(DEBUG) << "Trim memory";
129   auto* h = static_cast<WorkerThread*>(data);
130   {
131     std::lock_guard<std::recursive_mutex> lock(h->mutex_);
132     h->timer_ = 0;
133   }
134
135   if (database::DBHandleProvider::IsCrashedWriteRequest())
136     database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
137
138   sqlite3_release_memory(-1);
139   malloc_trim(0);
140   return G_SOURCE_REMOVE;
141 }
142
143 void WorkerThread::SetLocale(std::string locale) {
144   LOG(DEBUG) << "Change locale : " << locale_.GetObject()
145       << " -> "  << locale;
146   locale_.SetObject(std::move(locale));
147 }
148
149 void WorkerThread::SendError(const std::shared_ptr<PkgRequest>& req) {
150   pkgmgr_common::parcel::AbstractParcelable parcelable(
151       0, pkgmgr_common::parcel::ParcelableType::Unknown, PMINFO_R_ERROR);
152   tizen_base::Parcel p;
153   p.WriteParcelable(parcelable);
154   std::vector<uint8_t> raw = p.GetRaw();
155   req->SendData(&raw[0], raw.size());
156 }
157
158 }  // namespace pkgmgr_server