namespace pkgmgr_server {
+WorkerThread::Scheduler::Scheduler(pid_t tid) : tid_(tid) {
+ Get(&policy_, ¶m_);
+}
+
+void WorkerThread::Scheduler::ChangePolicy() {
+ struct sched_param param = { 0, };
+ param.sched_priority = 1;
+ if (Set(SCHED_RR, ¶m) != 0)
+ return;
+
+ int policy = -1;
+ Get(&policy, ¶m);
+ LOG(WARNING) << "Current policy: " << policy << ", priority: "
+ << param.sched_priority;
+}
+
+void WorkerThread::Scheduler::ResetPolicy() {
+ if (Set(policy_, ¶m_) != 0)
+ return;
+
+ struct sched_param param = { 0, };
+ int policy = -1;
+ Get(&policy, ¶m);
+ LOG(WARNING) << "Current policy: " << policy << ", priority: "
+ << param.sched_priority;
+}
+
+void WorkerThread::Scheduler::Get(int* policy, struct sched_param* param) {
+ if (sched_getparam(tid_, param) != 0)
+ LOG(ERROR) << "sched_getparam() is failed. errno: " << errno;
+
+ *policy = sched_getscheduler(tid_);
+ if (*policy < 0)
+ LOG(ERROR) << "sched_getscheduler() is failed. errno: " << errno;
+}
+
+int WorkerThread::Scheduler::Set(int policy, struct sched_param* param) {
+ if (sched_setscheduler(tid_, policy, param) != 0) {
+ LOG(ERROR) << "sched_setscheduler() is failed. policy: " << policy
+ << ", errno: " << errno;
+ return -1;
+ }
+
+ LOG(WARNING) << "policy: " << policy << ", sched_priority: "
+ << param->sched_priority;
+ return 0;
+}
+
WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
threads_.reserve(num);
for (unsigned int i = 0; i < num; ++i)
handler[pkgmgr_common::ReqType::CREATE_CACHE].reset(
new request_handler::CreateCacheRequestHandler());
+ std::unique_ptr<WorkerThread::Scheduler> scheduler(
+ new WorkerThread::Scheduler(gettid()));
+
LOG(DEBUG) << "Initialize request handlers";
while (true) {
std::shared_ptr<PkgRequest> req;
}
try {
+ if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+ scheduler->ChangePolicy();
+
handler[type]->SetPID(req->GetSenderPID());
if (!handler[type]->HandleRequest(req->GetData(), req->GetSize(),
locale_.GetObject()))
LOG(ERROR) << "Exception occurred: " << err.what()
<< ", pid: " << req->GetSenderPID();
SendError(req);
+ if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+ scheduler->ResetPolicy();
continue;
} catch (...) {
LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
SendError(req);
+ if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+ scheduler->ResetPolicy();
continue;
}
LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
continue;
}
+
+ if (type == pkgmgr_common::ReqType::CREATE_CACHE)
+ scheduler->ResetPolicy();
+
LOG(WARNING) << "Success response pid: " << req->GetSenderPID();
}
}
void WorkerThread::SetMemoryTrimTimer() {
- static guint timer = 0;
- if (timer > 0)
- g_source_remove(timer);
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ if (timer_ > 0)
+ g_source_remove(timer_);
- timer = g_timeout_add_seconds_full(G_PRIORITY_LOW, 3,
- TrimMemory, &timer, NULL);
+ timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
+ TrimMemory, this, NULL);
}
gboolean WorkerThread::TrimMemory(void* data) {
LOG(DEBUG) << "Trim memory";
- guint* timer = static_cast<guint*>(data);
- sqlite3_release_memory(-1);
- malloc_trim(0);
- *timer = 0;
+ auto* h = static_cast<WorkerThread*>(data);
+ {
+ std::lock_guard<std::recursive_mutex> lock(h->mutex_);
+ h->timer_ = 0;
+ }
if (database::DBHandleProvider::IsCrashedWriteRequest())
- database::DBHandleProvider::
- GetInst(getuid()).UnsetMemoryMode(getpid());
+ database::DBHandleProvider::GetInst(getuid()).UnsetMemoryMode(getpid());
- database::DBHandleProvider::
- GetInst(getuid()).TrimCache();
-
- return false;
+ sqlite3_release_memory(-1);
+ malloc_trim(0);
+ return G_SOURCE_REMOVE;
}
std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
#define SERVER_WORKER_THREAD_HH_
#include <glib-2.0/glib.h>
+#include <sched.h>
+#include <sys/types.h>
#include <condition_variable>
#include <memory>
private:
void Run();
void SendError(std::shared_ptr<PkgRequest> req);
- static void SetMemoryTrimTimer();
+ void SetMemoryTrimTimer();
static gboolean TrimMemory(void* data);
std::shared_ptr<PkgRequest> PopQueue();
private:
+ class Scheduler {
+ public:
+ Scheduler(pid_t tid);
+
+ void ChangePolicy();
+ void ResetPolicy();
+
+ private:
+ void Get(int* policy, struct sched_param* param);
+ int Set(int policy, struct sched_param* param);
+
+ private:
+ pid_t tid_;
+ int policy_ = 0;
+ struct sched_param param_ = { 0, };
+ };
+
+ private:
bool stop_all_;
utils::SharedObject<std::string> locale_;
std::queue<std::shared_ptr<PkgRequest>> queue_;
std::vector<std::thread> threads_;
std::mutex lock_;
std::condition_variable cv_;
+ std::recursive_mutex mutex_;
+ guint timer_ = 0;
};
} // namespace pkgmgr_server