Revert "[Tizen] Revert "AsyncTaskManager overhead reduce""
authorHosang Kim <hosang12.kim@samsung.com>
Wed, 5 Apr 2023 06:33:37 +0000 (15:33 +0900)
committerHosang Kim <hosang12.kim@samsung.com>
Wed, 5 Apr 2023 06:34:06 +0000 (15:34 +0900)
This reverts commit f09181b464aee0cdc28d27c71c4fab9322bd7550.

dali/internal/system/common/async-task-manager-impl.cpp
dali/internal/system/common/async-task-manager-impl.h

index fa9b8c8..16f44ba 100644 (file)
@@ -24,6 +24,9 @@
 #include <dali/devel-api/common/singleton-service.h>
 #include <dali/integration-api/adaptor-framework/adaptor.h>
 #include <dali/integration-api/debug.h>
+#include <dali/public-api/common/vector-wrapper.h>
+
+#include <unordered_map>
 
 namespace Dali
 {
@@ -44,7 +47,7 @@ size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
 {
   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
-  constexpr auto MAX_NUMBER_OF_THREADS = 10u;
+  constexpr auto MAX_NUMBER_OF_THREADS = 16u;
   DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
   return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
 }
@@ -59,10 +62,14 @@ size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t def
 
 #if defined(DEBUG_ENABLED)
 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
+
+uint32_t gThreadId = 0u; // Only for debug
 #endif
 
 } // unnamed namespace
 
+// AsyncTaskThread
+
 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
 : mConditionalWait(),
   mAsyncTaskManager(asyncTaskManager),
@@ -112,7 +119,16 @@ bool AsyncTaskThread::Request()
 
 void AsyncTaskThread::Run()
 {
+#if defined(DEBUG_ENABLED)
+  uint32_t threadId = gThreadId++;
+  {
+    char temp[100];
+    snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
+    SetThreadName(temp);
+  }
+#else
   SetThreadName("AsyncTaskThread");
+#endif
   mLogFactory.InstallLogFunction();
 
   while(!mDestroyThread)
@@ -124,17 +140,99 @@ void AsyncTaskThread::Run()
       if(!mDestroyThread)
       {
         mIsThreadIdle = true;
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
         mConditionalWait.Wait(lock);
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
       }
     }
     else
     {
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
       task->Process();
-      mAsyncTaskManager.CompleteTask(task);
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
+      if(!mDestroyThread)
+      {
+        mAsyncTaskManager.CompleteTask(task);
+      }
     }
   }
 }
 
+// AsyncTaskManager::CacheImpl
+
+struct AsyncTaskManager::CacheImpl
+{
+  CacheImpl(AsyncTaskManager& manager)
+  : mManager(manager)
+  {
+  }
+
+public:
+  // Insert / Erase task cache API.
+
+  /**
+   * @brief Insert cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer, typename Iterator>
+  static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
+  {
+    cacheMap[task.Get()].push_back(iterator);
+  }
+
+  /**
+   * @brief Erase cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer, typename Iterator>
+  static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
+  {
+    auto mapIter = cacheMap.find(task.Get());
+    if(mapIter != cacheMap.end())
+    {
+      auto& cacheContainer = (*mapIter).second;
+      auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
+
+      if(cacheIter != cacheContainer.end())
+      {
+        cacheContainer.erase(cacheIter);
+        if(cacheContainer.empty())
+        {
+          cacheMap.erase(mapIter);
+        }
+      }
+    }
+  }
+
+  /**
+   * @brief Erase all cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer>
+  static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
+  {
+    auto mapIter = cacheMap.find(task.Get());
+    if(mapIter != cacheMap.end())
+    {
+      cacheMap.erase(mapIter);
+    }
+  }
+
+public:
+  AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
+
+  // TODO : Can't we use std::set instead of std::vector?
+  // It will be slowdown If someone AddTask multiple times for same task.
+  using TaskCacheContainer        = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
+  using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+
+  TaskCacheContainer        mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
+  RunningTaskCacheContainer mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
+  TaskCacheContainer        mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
+};
+
+// AsyncTaskManager
+
 Dali::AsyncTaskManager AsyncTaskManager::Get()
 {
   Dali::AsyncTaskManager manager;
@@ -163,6 +261,8 @@ Dali::AsyncTaskManager AsyncTaskManager::Get()
 AsyncTaskManager::AsyncTaskManager()
 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
+  mWaitingHighProirityTaskCounts(0u),
+  mCacheImpl(new CacheImpl(*this)),
   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
   mProcessorRegistered(false)
 {
@@ -183,14 +283,29 @@ void AsyncTaskManager::AddTask(AsyncTaskPtr task)
   if(task)
   {
     // Lock while adding task to the queue
-    Mutex::ScopedLock lock(mMutex);
+    Mutex::ScopedLock lock(mWaitingTasksMutex);
 
-    mWaitingTasks.push_back(task);
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
 
-    // Finish all Running threads are working
-    if(mRunningTasks.size() >= mTasks.GetElementCount())
+    // push back into waiting queue.
+    auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
+    CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
+
+    if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
     {
-      return;
+      // Increase the number of waiting tasks for high priority.
+      ++mWaitingHighProirityTaskCounts;
+    }
+
+    {
+      // For thread safety
+      Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+      // Finish all Running threads are working
+      if(mRunningTasks.size() >= mTasks.GetElementCount())
+      {
+        return;
+      }
     }
   }
 
@@ -220,114 +335,241 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
 {
   if(task)
   {
-    // Lock while remove task from the queue
-    Mutex::ScopedLock lock(mMutex);
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
+
+    // Check whether we need to unregister processor.
+    // If there is some non-empty queue exist, we don't need to check unregister.
+    bool needCheckUnregisterProcessor = true;
 
-    if(!mWaitingTasks.empty())
     {
-      for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mWaitingTasksMutex);
+
+      auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mWaitingTasksCache.end())
       {
-        if((*it) && (*it) == task)
-        {
-          it = mWaitingTasks.erase(it);
-        }
-        else
+        for(auto iterator : mapIter->second)
         {
-          it++;
+          if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
+          {
+            // Decrease the number of waiting tasks for high priority.
+            --mWaitingHighProirityTaskCounts;
+          }
+          mWaitingTasks.erase(iterator);
         }
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
+      }
+
+      if(!mWaitingTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
       }
     }
 
-    if(!mRunningTasks.empty())
     {
-      for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mRunningTasksMutex);
+
+      auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mRunningTasksCache.end())
       {
-        if((*iter).first == task)
+        for(auto iterator : mapIter->second)
         {
-          (*iter).second = true;
+          // We cannot erase container. Just mark as erased.
+          // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
+          iterator->second = true;
         }
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
+      }
+
+      if(!mRunningTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
       }
     }
 
-    if(!mCompletedTasks.empty())
     {
-      for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+      auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mCompletedTasksCache.end())
       {
-        if((*it) && (*it) == task)
+        for(auto iterator : mapIter->second)
         {
-          it = mCompletedTasks.erase(it);
+          mCompletedTasks.erase(iterator);
         }
-        else
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
+      }
+
+      if(!mCompletedTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
+      }
+    }
+
+    // UnregisterProcessor required to lock mutex. Call that API only if required.
+    if(needCheckUnregisterProcessor)
+    {
+      UnregisterProcessor();
+    }
+  }
+}
+
+AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
+{
+  // Lock while popping task out from the queue
+  Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+  if(mCompletedTasks.empty())
+  {
+    return AsyncTaskPtr();
+  }
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+
+  auto         next     = mCompletedTasks.begin();
+  AsyncTaskPtr nextTask = *next;
+  CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
+  mCompletedTasks.erase(next);
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
+
+  return nextTask;
+}
+
+void AsyncTaskManager::UnregisterProcessor()
+{
+  if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
+    // Keep processor at least 1 task exist.
+    // Please be careful the order of mutex, to avoid dead lock.
+    // TODO : Should we lock all mutex rightnow?
+    Mutex::ScopedLock lock1(mWaitingTasksMutex);
+    if(mWaitingTasks.empty())
+    {
+      Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+      if(mRunningTasks.empty())
+      {
+        Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+        if(mCompletedTasks.empty())
         {
-          it++;
+          Dali::Adaptor::Get().UnregisterProcessor(*this);
+          mProcessorRegistered = false;
         }
       }
     }
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
+  }
+}
+
+void AsyncTaskManager::TasksCompleted()
+{
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
+  while(AsyncTaskPtr task = PopNextCompletedTask())
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
+    CallbackBase::Execute(*(task->GetCompletedCallback()), task);
   }
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
 
   UnregisterProcessor();
 }
 
+void AsyncTaskManager::Process(bool postProcessor)
+{
+  TasksCompleted();
+}
+
+/// Worker thread called
 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
 {
   // Lock while popping task out from the queue
-  Mutex::ScopedLock lock(mMutex);
+  Mutex::ScopedLock lock(mWaitingTasksMutex);
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
 
   // pop out the next task from the queue
   AsyncTaskPtr nextTask = nullptr;
 
+  // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
+  if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
+  {
+    // For thread safety
+    Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+    if(mAvaliableLowPriorityTaskCounts == 0u)
+    {
+      // There are no avaliabe tasks to run now. Return nullptr.
+      return nextTask;
+    }
+  }
+
   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
   {
     if((*iter)->IsReady())
     {
       const auto priorityType  = (*iter)->GetPriorityType();
-      const bool taskAvaliable = (priorityType == AsyncTask::PriorityType::HIGH) || // Task always valid if it's priority is high
-                                 (mAvaliableLowPriorityTaskCounts > 0u);            // or priority is low, but we can use it.
+      bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
+      if(!taskAvaliable)
+      {
+        // For thread safety
+        Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+        taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
+      }
 
       if(taskAvaliable)
       {
         nextTask = *iter;
 
         // Add Running queue
-        mRunningTasks.push_back(std::make_pair(nextTask, false));
-        mWaitingTasks.erase(iter);
+        {
+          // Lock while popping task out from the queue
+          Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
 
-        // Decrease avaliable task counts if it is low priority
-        if(priorityType == AsyncTask::PriorityType::LOW)
+          DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
+
+          auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+          CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
+
+          // Decrease avaliable task counts if it is low priority
+          if(priorityType == AsyncTask::PriorityType::LOW)
+          {
+            // We are under running task mutex. We can decrease it.
+            --mAvaliableLowPriorityTaskCounts;
+          }
+        }
+
+        if(priorityType == AsyncTask::PriorityType::HIGH)
         {
-          --mAvaliableLowPriorityTaskCounts;
+          // Decrease the number of waiting tasks for high priority.
+          --mWaitingHighProirityTaskCounts;
         }
+
+        CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
+        mWaitingTasks.erase(iter);
         break;
       }
     }
   }
 
-  return nextTask;
-}
-
-AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
-{
-  // Lock while popping task out from the queue
-  Mutex::ScopedLock lock(mMutex);
-
-  if(mCompletedTasks.empty())
-  {
-    return AsyncTaskPtr();
-  }
-
-  std::vector<AsyncTaskPtr>::iterator next     = mCompletedTasks.begin();
-  AsyncTaskPtr                        nextTask = *next;
-  mCompletedTasks.erase(next);
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
 
   return nextTask;
 }
 
+/// Worker thread called
 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
 {
   // Lock while adding task to the queue
   {
-    Mutex::ScopedLock lock(mMutex);
+    Mutex::ScopedLock lock(mRunningTasksMutex);
+
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
 
+    // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
     for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
     {
       if((*iter).first == task)
@@ -336,19 +578,27 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
         {
           if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
           {
-            mCompletedTasks.push_back(task);
+            Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+
+            DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+
+            auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+            CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
           }
-        }
 
-        // Delete this task in running queue
-        mRunningTasks.erase(iter);
+          CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+        }
 
         // Increase avaliable task counts if it is low priority
         const auto priorityType = task->GetPriorityType();
         if(priorityType == AsyncTask::PriorityType::LOW)
         {
+          // We are under running task mutex. We can increase it.
           ++mAvaliableLowPriorityTaskCounts;
         }
+
+        // Delete this task in running queue
+        mRunningTasks.erase(iter);
         break;
       }
     }
@@ -357,41 +607,17 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
   // wake up the main thread
   if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
   {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
     mTrigger->Trigger();
   }
   else
   {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
   }
 }
 
-void AsyncTaskManager::UnregisterProcessor()
-{
-  if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
-  {
-    Mutex::ScopedLock lock(mMutex);
-    if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
-    {
-      Dali::Adaptor::Get().UnregisterProcessor(*this);
-      mProcessorRegistered = false;
-    }
-  }
-}
-
-void AsyncTaskManager::TasksCompleted()
-{
-  while(AsyncTaskPtr task = PopNextCompletedTask())
-  {
-    CallbackBase::Execute(*(task->GetCompletedCallback()), task);
-  }
-
-  UnregisterProcessor();
-}
-
-void AsyncTaskManager::Process(bool postProcessor)
-{
-  TasksCompleted();
-}
+// AsyncTaskManager::TaskHelper
 
 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
index 2b31789..c33f275 100644 (file)
@@ -24,6 +24,7 @@
 #include <dali/devel-api/threading/thread.h>
 #include <dali/integration-api/adaptor-framework/log-factory-interface.h>
 #include <dali/integration-api/processor-interface.h>
+#include <dali/public-api/common/list-wrapper.h>
 #include <dali/public-api/object/base-object.h>
 #include <memory>
 
@@ -117,13 +118,6 @@ public:
   void RemoveTask(AsyncTaskPtr task);
 
   /**
-   * Pop the next task out from the queue.
-   *
-   * @return The next task to be processed.
-   */
-  AsyncTaskPtr PopNextTaskToProcess();
-
-  /**
    * Pop the next task out from the completed queue, called by main thread.
    *
    * @return The next task in the completed queue.
@@ -131,13 +125,6 @@ public:
   AsyncTaskPtr PopNextCompletedTask();
 
   /**
-   * Pop the next task out from the running queue and add this task to the completed queue.
-   *
-   * @param[in] task The task added to the queue.
-   */
-  void CompleteTask(AsyncTaskPtr task);
-
-  /**
    * @brief Unregister a previously registered processor
    */
   void UnregisterProcessor();
@@ -152,6 +139,21 @@ public:
    */
   void Process(bool postProcessor) override;
 
+public: // Worker thread called method
+  /**
+   * Pop the next task out from the queue.
+   *
+   * @return The next task to be processed.
+   */
+  AsyncTaskPtr PopNextTaskToProcess();
+
+  /**
+   * Pop the next task out from the running queue and add this task to the completed queue.
+   *
+   * @param[in] task The task added to the queue.
+   */
+  void CompleteTask(AsyncTaskPtr task);
+
 private:
   /**
    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
@@ -198,18 +200,31 @@ private:
   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
 
 private:
-  std::vector<AsyncTaskPtr> mWaitingTasks;   //The queue of the tasks waiting to async process
-  std::vector<AsyncTaskPtr> mCompletedTasks; //The queue of the tasks with the async process
+  using AsyncTaskContainer = std::list<AsyncTaskPtr>;
 
-  using AsyncTaskPair = std::pair<AsyncTaskPtr, bool>;
-  std::vector<AsyncTaskPair> mRunningTasks; ///< The queue of the running tasks
+  using AsyncTaskPair             = std::pair<AsyncTaskPtr, bool>;
+  using AsyncRunningTaskContainer = std::list<AsyncTaskPair>;
+
+  AsyncTaskContainer        mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
+  AsyncRunningTaskContainer mRunningTasks;   ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
+  AsyncTaskContainer        mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
 
   RoundRobinContainerView<TaskHelper> mTasks;
 
-  uint32_t mAvaliableLowPriorityTaskCounts; ///< The number of tasks that can be processed for proirity type LOW.
+  uint32_t mAvaliableLowPriorityTaskCounts; ///< The number of tasks that can be processed for priority type LOW.
+                                            ///< Be used to select next wating task determining algorithm.
+                                            ///< Note : For thread safety, Please set/get this value under mRunningTasksMutex scope.
+  uint32_t mWaitingHighProirityTaskCounts;  ///< The number of tasks that waiting now for priority type HIGH.
                                             ///< Be used to select next wating task determining algorithm.
+                                            ///< Note : For thread safety, Please set/get this value under mWaitingTasksMutex scope.
+
+  Dali::Mutex mWaitingTasksMutex;   ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
+  Dali::Mutex mRunningTasksMutex;   ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
+  Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
+
+  struct CacheImpl;
+  std::unique_ptr<CacheImpl> mCacheImpl; ///< Cache interface for AsyncTaskManager.
 
-  Dali::Mutex                          mMutex;
   std::unique_ptr<EventThreadCallback> mTrigger;
   bool                                 mProcessorRegistered;
 };