Fix inconsistency between cache and database
[platform/core/appfw/pkgmgr-info.git] / src / server / worker_thread.cc
index cd0982c..b623a2c 100644 (file)
  * limitations under the License.
  */
 
-#include <mutex>
-#include <queue>
-#include <thread>
-
-#include "pkg_request.hh"
 #include "worker_thread.hh"
 
+#include <sqlite3.h>
+#include <sys/types.h>
+#include <malloc.h>
+
+#include <tzplatform_config.h>
+
+#include "abstract_parcelable.hh"
+#include "cynara_checker.hh"
+#include "db_change_observer.hh"
+#include "request_handler_factory.hh"
+#include "server/database/db_handle_provider.hh"
+#include "server/database/update_pending_cache_handler.hh"
+#include "utils/logging.hh"
+
+#include "pkgmgrinfo_debug.h"
+
+#ifdef LOG_TAG
+#undef LOG_TAG
+#endif
+#define LOG_TAG "PKGMGR_INFO"
+
+#ifndef SQLITE_ENABLE_MEMORY_MANAGEMENT
+#define SQLITE_ENABLE_MEMORY_MANAGEMENT
+#endif
+
+namespace {
+
+uid_t globaluser_uid = -1;
+
+uid_t GetGlobalUID() {
+  if (globaluser_uid == (uid_t)-1)
+    globaluser_uid = tzplatform_getuid(TZ_SYS_GLOBALAPP_USER);
+
+  return globaluser_uid;
+}
+
+uid_t ConvertUID(uid_t uid) {
+  if (uid < REGULAR_USER)
+    return GetGlobalUID();
+  else
+    return uid;
+}
+
+const char PRIVILEGE_PACKAGE_MANAGER_ADMIN[] =
+    "http://tizen.org/privilege/packagemanager.admin";
+
+std::vector<std::string> GetPrivileges(pkgmgr_common::ReqType type) {
+  std::vector<std::string> ret;
+  if (type == pkgmgr_common::SET_CERT_INFO)
+    ret.emplace_back(PRIVILEGE_PACKAGE_MANAGER_ADMIN);
+  else if (type == pkgmgr_common::SET_PKG_INFO)
+    ret.emplace_back(PRIVILEGE_PACKAGE_MANAGER_ADMIN);
+
+  return ret;
+}
+
+}  // namespace
+
 namespace pkgmgr_server {
 
-WorkerThread::WorkerThread(int num) { /* TODO implement code */ }
+WorkerThread::WorkerThread(unsigned int num) : stop_all_(false) {
+  threads_.reserve(num);
+  for (unsigned int i = 0; i < num; ++i)
+    threads_.emplace_back([this]() -> void { this->Run(); });
+
+  LOG(DEBUG) << num << " Worker threads are created";
+}
+
+WorkerThread::~WorkerThread() {
+  stop_all_ = true;
+  cv_.notify_all();
+
+  for (auto& t : threads_)
+    t.join();
+}
 
 bool WorkerThread::PushQueue(std::shared_ptr<PkgRequest> req) {
-  /* TODO implement code */
+  {
+    std::unique_lock<std::mutex> u(lock_);
+    queue_.push(req);
+    cv_.notify_one();
+  }
   return true;
 }
 
-void Run() { /* TODO implement code */ }
+std::shared_ptr<PkgRequest> WorkerThread::PopQueue() {
+  SetMemoryTrimTimer();
+  std::unique_lock<std::mutex> u(lock_);
+  cv_.wait(u, [this] { return !this->queue_.empty() || stop_all_; });
+  if (stop_all_ && queue_.empty())
+    return nullptr;
+
+  auto req = queue_.front();
+  queue_.pop();
+  return req;
+}
+
+void WorkerThread::Run() {
+  RequestHandlerFactory factory;
+  LOG(DEBUG) << "Initialize request handlers";
+  while (true) {
+    std::shared_ptr<PkgRequest> req = PopQueue();
+    if (req == nullptr)
+      return;
+
+    if (!req->GetPrivilegeChecked()) {
+      if (!req->ReceiveData()) {
+        LOG(ERROR) << "Fail to receive data";
+        continue;
+      }
+
+      pkgmgr_common::ReqType type = req->GetRequestType();
+      if (pkgmgr_common::IsDbWriteRequest(type))
+        StopDbChangeListening();
+      std::vector<std::string> privileges = GetPrivileges(type);
+      if (!CynaraChecker::GetInst().CheckPrivilege(this, req, privileges))
+        continue;
+    }
+
+    auto type = req->GetRequestType();
+    LOG(WARNING) << "Request type: " << pkgmgr_common::ReqTypeToString(type)
+        << " pid: " << req->GetSenderPID() << " tid: " << req->GetSenderTID();
+    auto handler = factory.GetRequestHandler(type);
+    if (handler == nullptr)
+      continue;
+
+    try {
+      handler->PreExec();
+      handler->SetUID(ConvertUID(req->GetSenderUID()));
+      handler->SetPID(req->GetSenderPID());
+      if (!handler->HandleRequest(req->DetachData(), req->GetSize(),
+          locale_.GetObject()))
+        LOG(ERROR) << "Failed to handle request";
+
+      if (req->SendData(handler->ExtractResult()) == false)
+        LOG(ERROR) << "Failed to send response pid: " << req->GetSenderPID();
+      else
+        LOG(WARNING) << "Success response pid: " << req->GetSenderPID()
+            << " tid: " << req->GetSenderTID();
+    } catch (const std::exception& err) {
+      LOG(ERROR) << "Exception occurred: " << err.what()
+                 << ", pid: " << req->GetSenderPID();
+      SendError(req);
+    } catch (...) {
+      LOG(ERROR) << "Exception occurred pid: " << req->GetSenderPID();
+      SendError(req);
+    }
+
+    handler->PostExec();
+  }
+}
+
+void WorkerThread::SetMemoryTrimTimer() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (timer_ > 0)
+    g_source_remove(timer_);
+
+  timer_ = g_timeout_add_seconds_full(G_PRIORITY_LOW, 10,
+      TrimMemory, this, NULL);
+}
+
+gboolean WorkerThread::TrimMemory(void* data) {
+  LOG(DEBUG) << "Trim memory";
+  auto* h = static_cast<WorkerThread*>(data);
+  {
+    std::lock_guard<std::recursive_mutex> lock(h->mutex_);
+    h->timer_ = 0;
+  }
+
+  auto crashed_writer_pids =
+      database::DBHandleProvider::CrashedWriteRequestPIDs();
+  if (!crashed_writer_pids.empty()) {
+    database::UpdatePendingCacheHandler db(getuid(), std::move(crashed_writer_pids), {});
+    db.SetLocale(h->locale_.GetObject());
+    db.Execute();
+  }
+
+  sqlite3_release_memory(-1);
+  malloc_trim(0);
+  return G_SOURCE_REMOVE;
+}
+
+void WorkerThread::SetLocale(std::string locale) {
+  LOG(DEBUG) << "Change locale : " << locale_.GetObject()
+      << " -> "  << locale;
+  locale_.SetObject(std::move(locale));
+}
+
+void WorkerThread::SendError(const std::shared_ptr<PkgRequest>& req) {
+  pkgmgr_common::parcel::AbstractParcelable parcelable(
+      0, pkgmgr_common::parcel::ParcelableType::Unknown, PMINFO_R_ERROR);
+  tizen_base::Parcel p;
+  p.WriteParcelable(parcelable);
+  req->SendData(p);
+}
+
+void WorkerThread::StopDbChangeListening() {
+  auto& db_observer = pkgmgr_common::DbChangeObserver::GetInst();
+
+  if (db_observer.GetDisposed())
+    return;
 
-std::shared_ptr<PkgRequest> PopQueue() {
-  /* TODO implement code */
-  return nullptr;
+  LOG(WARNING) << "Try stop listening db change";
+  db_observer.StopListening();
 }
 
 }  // namespace pkgmgr_server