Change scheduling policy to a round-robin policy 09/274409/4
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 28 Apr 2022 00:37:44 +0000 (09:37 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Fri, 29 Apr 2022 05:49:10 +0000 (14:49 +0900)
Before creating the cache information(CREATE_CACHE), the worker thread of
pkginfo-server adjusts scheduling policy to the round-robin policy to improve
the performance temporaily. And, the CREATE_CACHE command is started after
the worker threads are created immediately.
Calling TrimCache() is removed from TrimMemory().

Change-Id: I1ecc2aabdbdb5f171b411704bd5cbef15c0e17b1
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
packaging/pkgmgr-info.service
src/server/runner.cc
src/server/worker_thread.cc
src/server/worker_thread.hh

index fac20e3..a2c1b00 100755 (executable)
@@ -6,7 +6,7 @@ After=systemd-tmpfiles-setup.service pkgmgr-info.socket
 
 [Service]
 SmackProcessLabel=System
-Capabilities=cap_dac_override=i
+Capabilities=cap_dac_override,cap_sys_nice=i
 SecureBits=keep-caps
 ExecStart=/usr/bin/pkginfo-server
 Type=notify
index 52e8211..b5725d1 100644 (file)
@@ -69,6 +69,10 @@ Runner::Runner(unsigned int thread_num) {
   sid_ = g_unix_fd_add(server_->GetFd(), condition, OnReceiveRequest, this);
   pkgmgr_common::SystemLocale::GetInst().RegisterEvent(this);
   thread_pool_->SetLocale(pkgmgr_common::SystemLocale::GetInst().Get());
+
+  QueueRequest(
+      std::make_shared<CreateCacheRequest>(
+          tzplatform_getuid(TZ_SYS_DEFAULT_USER)));
   LOGI("Start Runner");
 }
 
index 08f2929..c377a8f 100644 (file)
 
 namespace pkgmgr_server {
 
+WorkerThread::Scheduler::Scheduler(pid_t tid) : tid_(tid) {
+  Get(&policy_, &param_);
+}
+
+void WorkerThread::Scheduler::ChangePolicy() {
+  struct sched_param param = { 0, };
+  param.sched_priority = 1;
+  if (Set(SCHED_RR, &param) != 0)
+    return;
+
+  int policy = -1;
+  Get(&policy, &param);
+  LOG(WARNING) << "Current policy: " << policy << ", priority: "
+               << param.sched_priority;
+}
+
+void WorkerThread::Scheduler::ResetPolicy() {
+  if (Set(policy_, &param_) != 0)
+    return;
+
+  struct sched_param param = { 0, };
+  int policy = -1;
+  Get(&policy, &param);
+  LOG(WARNING) << "Current policy: " << policy << ", priority: "
+               << param.sched_priority;
+}
+
+void WorkerThread::Scheduler::Get(int* policy, struct sched_param* param) {
+  if (sched_getparam(tid_, param) != 0)
+    LOG(ERROR) << "sched_getparam() is failed. errno: " << errno;
+
+  *policy = sched_getscheduler(tid_);
+  if (*policy < 0)
+    LOG(ERROR) << "sched_getscheduler() is failed. errno: " << errno;
+}
+
+int WorkerThread::Scheduler::Set(int policy, struct sched_param* param) {
+  if (sched_setscheduler(tid_, policy, param) != 0) {
+    LOG(ERROR) << "sched_setscheduler() is failed. policy: " << policy
+               << ", errno: " << errno;
+    return -1;
+  }
+
+  LOG(WARNING) << "policy: " << policy << ", sched_priority: "
+               << param->sched_priority;
+  return 0;
+}
+
 WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
   threads_.reserve(num);
   for (unsigned int i = 0; i < num; ++i)
@@ -95,6 +143,9 @@ void WorkerThread::Run() {
   handler[pkgmgr_common::ReqType::CREATE_CACHE].reset(
       new request_handler::CreateCacheRequestHandler());
 
+  std::unique_ptr<WorkerThread::Scheduler> scheduler(
+      new WorkerThread::Scheduler(gettid()));
+
   LOG(DEBUG) << "Initialize request handlers";
   while (true) {
     std::shared_ptr<PkgRequest> req;
@@ -119,6 +170,9 @@ void WorkerThread::Run() {
     }
 
     try {
+      if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+        scheduler->ChangePolicy();
+
       handler[type]->SetPID(req->GetSenderPID());
       if (!handler[type]->HandleRequest(req->GetData(), req->GetSize(),
                                         locale_.GetObject()))
@@ -127,11 +181,15 @@ void WorkerThread::Run() {
       LOG(ERROR) << "Exception occurred: " << err.what()
           << ", pid: " << req->GetSenderPID();
       SendError(req);
+      if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+        scheduler->ResetPolicy();
 
       continue;
     } catch (...) {
       LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
       SendError(req);
+      if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+        scheduler->ResetPolicy();
 
       continue;
     }
@@ -141,34 +199,37 @@ void WorkerThread::Run() {
       LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
       continue;
     }
+
+    if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+      scheduler->ResetPolicy();
+
     LOG(WARNING) << "Success response pid: " <<  req->GetSenderPID();
   }
 }
 
 void WorkerThread::SetMemoryTrimTimer() {
-  static guint timer = 0;
-  if (timer > 0)
-    g_source_remove(timer);
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (timer_ > 0)
+    g_source_remove(timer_);
 
-  timer = g_timeout_add_seconds_full(G_PRIORITY_LOW, 3,
-      TrimMemory, &timer, NULL);
+  timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
+      TrimMemory, this, NULL);
 }
 
 gboolean WorkerThread::TrimMemory(void* data) {
   LOG(DEBUG) << "Trim memory";
-  guint* timer = static_cast<guint*>(data);
-  sqlite3_release_memory(-1);
-  malloc_trim(0);
-  *timer = 0;
+  auto* h = static_cast<WorkerThread*>(data);
+  {
+    std::lock_guard<std::recursive_mutex> lock(h->mutex_);
+    h->timer_ = 0;
+  }
 
   if (database::DBHandleProvider::IsCrashedWriteRequest())
-    database::DBHandleProvider::
-        GetInst(getuid()).UnsetMemoryMode(getpid());
+    database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
 
-  database::DBHandleProvider::
-        GetInst(getuid()).TrimCache();
-
-  return false;
+  sqlite3_release_memory(-1);
+  malloc_trim(0);
+  return G_SOURCE_REMOVE;
 }
 
 std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
index 809ca88..4775cb8 100644 (file)
@@ -18,6 +18,8 @@
 #define SERVER_WORKER_THREAD_HH_
 
 #include <glib-2.0/glib.h>
+#include <sched.h>
+#include <sys/types.h>
 
 #include <condition_variable>
 #include <memory>
@@ -46,17 +48,37 @@ class EXPORT_API WorkerThread {
  private:
   void Run();
   void SendError(std::shared_ptr<PkgRequest> req);
-  static void SetMemoryTrimTimer();
+  void SetMemoryTrimTimer();
   static gboolean TrimMemory(void* data);
   std::shared_ptr<PkgRequest> PopQueue();
 
  private:
+  class Scheduler {
+   public:
+    Scheduler(pid_t tid);
+
+    void ChangePolicy();
+    void ResetPolicy();
+
+   private:
+    void Get(int* policy, struct sched_param* param);
+    int Set(int policy, struct sched_param* param);
+
+   private:
+    pid_t tid_;
+    int policy_ = 0;
+    struct sched_param param_ = { 0, };
+  };
+
+ private:
   bool stop_all_;
   utils::SharedObject<std::string> locale_;
   std::queue<std::shared_ptr<PkgRequest>> queue_;
   std::vector<std::thread> threads_;
   std::mutex lock_;
   std::condition_variable cv_;
+  std::recursive_mutex mutex_;
+  guint timer_ = 0;
 };
 
 }  // namespace pkgmgr_server