From fab7bca0690188d34fbb5242998cab2c5ae6e485 Mon Sep 17 00:00:00 2001 From: Hwankyu Jhun Date: Thu, 28 Apr 2022 09:37:44 +0900 Subject: [PATCH] Change scheduling policy to a round-robin policy Before creating the cache information(CREATE_CACHE), the worker thread of pkginfo-server adjusts scheduling policy to the round-robin policy to improve the performance temporaily. And, the CREATE_CACHE command is started after the worker threads are created immediately. Calling TrimCache() is removed from TrimMemory(). Change-Id: I1ecc2aabdbdb5f171b411704bd5cbef15c0e17b1 Signed-off-by: Hwankyu Jhun --- packaging/pkgmgr-info.service | 2 +- src/server/runner.cc | 4 ++ src/server/worker_thread.cc | 91 ++++++++++++++++++++++++++++++++++++------- src/server/worker_thread.hh | 24 +++++++++++- 4 files changed, 104 insertions(+), 17 deletions(-) diff --git a/packaging/pkgmgr-info.service b/packaging/pkgmgr-info.service index fac20e3..a2c1b00 100755 --- a/packaging/pkgmgr-info.service +++ b/packaging/pkgmgr-info.service @@ -6,7 +6,7 @@ After=systemd-tmpfiles-setup.service pkgmgr-info.socket [Service] SmackProcessLabel=System -Capabilities=cap_dac_override=i +Capabilities=cap_dac_override,cap_sys_nice=i SecureBits=keep-caps ExecStart=/usr/bin/pkginfo-server Type=notify diff --git a/src/server/runner.cc b/src/server/runner.cc index 52e8211..b5725d1 100644 --- a/src/server/runner.cc +++ b/src/server/runner.cc @@ -69,6 +69,10 @@ Runner::Runner(unsigned int thread_num) { sid_ = g_unix_fd_add(server_->GetFd(), condition, OnReceiveRequest, this); pkgmgr_common::SystemLocale::GetInst().RegisterEvent(this); thread_pool_->SetLocale(pkgmgr_common::SystemLocale::GetInst().Get()); + + QueueRequest( + std::make_shared( + tzplatform_getuid(TZ_SYS_DEFAULT_USER))); LOGI("Start Runner"); } diff --git a/src/server/worker_thread.cc b/src/server/worker_thread.cc index 08f2929..c377a8f 100644 --- a/src/server/worker_thread.cc +++ b/src/server/worker_thread.cc @@ -46,6 +46,54 @@ namespace pkgmgr_server { +WorkerThread::Scheduler::Scheduler(pid_t tid) : tid_(tid) { + Get(&policy_, ¶m_); +} + +void WorkerThread::Scheduler::ChangePolicy() { + struct sched_param param = { 0, }; + param.sched_priority = 1; + if (Set(SCHED_RR, ¶m) != 0) + return; + + int policy = -1; + Get(&policy, ¶m); + LOG(WARNING) << "Current policy: " << policy << ", priority: " + << param.sched_priority; +} + +void WorkerThread::Scheduler::ResetPolicy() { + if (Set(policy_, ¶m_) != 0) + return; + + struct sched_param param = { 0, }; + int policy = -1; + Get(&policy, ¶m); + LOG(WARNING) << "Current policy: " << policy << ", priority: " + << param.sched_priority; +} + +void WorkerThread::Scheduler::Get(int* policy, struct sched_param* param) { + if (sched_getparam(tid_, param) != 0) + LOG(ERROR) << "sched_getparam() is failed. errno: " << errno; + + *policy = sched_getscheduler(tid_); + if (*policy < 0) + LOG(ERROR) << "sched_getscheduler() is failed. errno: " << errno; +} + +int WorkerThread::Scheduler::Set(int policy, struct sched_param* param) { + if (sched_setscheduler(tid_, policy, param) != 0) { + LOG(ERROR) << "sched_setscheduler() is failed. policy: " << policy + << ", errno: " << errno; + return -1; + } + + LOG(WARNING) << "policy: " << policy << ", sched_priority: " + << param->sched_priority; + return 0; +} + WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) { threads_.reserve(num); for (unsigned int i = 0; i < num; ++i) @@ -95,6 +143,9 @@ void WorkerThread::Run() { handler[pkgmgr_common::ReqType::CREATE_CACHE].reset( new request_handler::CreateCacheRequestHandler()); + std::unique_ptr scheduler( + new WorkerThread::Scheduler(gettid())); + LOG(DEBUG) << "Initialize request handlers"; while (true) { std::shared_ptr req; @@ -119,6 +170,9 @@ void WorkerThread::Run() { } try { + if (type == pkgmgr_common::ReqType::CREATE_CACHE) + scheduler->ChangePolicy(); + handler[type]->SetPID(req->GetSenderPID()); if (!handler[type]->HandleRequest(req->GetData(), req->GetSize(), locale_.GetObject())) @@ -127,11 +181,15 @@ void WorkerThread::Run() { LOG(ERROR) << "Exception occurred: " << err.what() << ", pid: " << req->GetSenderPID(); SendError(req); + if (type == pkgmgr_common::ReqType::CREATE_CACHE) + scheduler->ResetPolicy(); continue; } catch (...) { LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID(); SendError(req); + if (type == pkgmgr_common::ReqType::CREATE_CACHE) + scheduler->ResetPolicy(); continue; } @@ -141,34 +199,37 @@ void WorkerThread::Run() { LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID(); continue; } + + if (type == pkgmgr_common::ReqType::CREATE_CACHE) + scheduler->ResetPolicy(); + LOG(WARNING) << "Success response pid: " << req->GetSenderPID(); } } void WorkerThread::SetMemoryTrimTimer() { - static guint timer = 0; - if (timer > 0) - g_source_remove(timer); + std::lock_guard lock(mutex_); + if (timer_ > 0) + g_source_remove(timer_); - timer = g_timeout_add_seconds_full(G_PRIORITY_LOW, 3, - TrimMemory, &timer, NULL); + timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10, + TrimMemory, this, NULL); } gboolean WorkerThread::TrimMemory(void* data) { LOG(DEBUG) << "Trim memory"; - guint* timer = static_cast(data); - sqlite3_release_memory(-1); - malloc_trim(0); - *timer = 0; + auto* h = static_cast(data); + { + std::lock_guard lock(h->mutex_); + h->timer_ = 0; + } if (database::DBHandleProvider::IsCrashedWriteRequest()) - database::DBHandleProvider:: - GetInst(getuid()).UnsetMemoryMode(getpid()); + database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid()); - database::DBHandleProvider:: - GetInst(getuid()).TrimCache(); - - return false; + sqlite3_release_memory(-1); + malloc_trim(0); + return G_SOURCE_REMOVE; } std::shared_ptr WorkerThread::PopQueue() { diff --git a/src/server/worker_thread.hh b/src/server/worker_thread.hh index 809ca88..4775cb8 100644 --- a/src/server/worker_thread.hh +++ b/src/server/worker_thread.hh @@ -18,6 +18,8 @@ #define SERVER_WORKER_THREAD_HH_ #include +#include +#include #include #include @@ -46,17 +48,37 @@ class EXPORT_API WorkerThread { private: void Run(); void SendError(std::shared_ptr req); - static void SetMemoryTrimTimer(); + void SetMemoryTrimTimer(); static gboolean TrimMemory(void* data); std::shared_ptr PopQueue(); private: + class Scheduler { + public: + Scheduler(pid_t tid); + + void ChangePolicy(); + void ResetPolicy(); + + private: + void Get(int* policy, struct sched_param* param); + int Set(int policy, struct sched_param* param); + + private: + pid_t tid_; + int policy_ = 0; + struct sched_param param_ = { 0, }; + }; + + private: bool stop_all_; utils::SharedObject locale_; std::queue> queue_; std::vector threads_; std::mutex lock_; std::condition_variable cv_; + std::recursive_mutex mutex_; + guint timer_ = 0; }; } // namespace pkgmgr_server -- 2.7.4