AsyncTaskManager overhead reduce 33/289933/9
authorEunki Hong <eunkiki.hong@samsung.com>
Wed, 15 Mar 2023 18:08:42 +0000 (03:08 +0900)
committerEunki, Hong <eunkiki.hong@samsung.com>
Tue, 28 Mar 2023 03:56:19 +0000 (12:56 +0900)
1. Let we use std::list instead of std::vector, to reduce 'erase' operation.
2. Seperate mutex per logic. For example, we don't need to wait mComplete container
   changed finished at AddTask called.
3. We can call RemoveTask even if task completed. So make that we don't iterate
   whole containers. Cache and erase directly what we need.

Change-Id: Id41d5370013c862614e29c62cd6bcd014005e008
Signed-off-by: Eunki Hong <eunkiki.hong@samsung.com>
dali/internal/system/common/async-task-manager-impl.cpp
dali/internal/system/common/async-task-manager-impl.h

index fa9b8c8..16f44ba 100644 (file)
@@ -24,6 +24,9 @@
 #include <dali/devel-api/common/singleton-service.h>
 #include <dali/integration-api/adaptor-framework/adaptor.h>
 #include <dali/integration-api/debug.h>
+#include <dali/public-api/common/vector-wrapper.h>
+
+#include <unordered_map>
 
 namespace Dali
 {
@@ -44,7 +47,7 @@ size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
 {
   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
-  constexpr auto MAX_NUMBER_OF_THREADS = 10u;
+  constexpr auto MAX_NUMBER_OF_THREADS = 16u;
   DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
   return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
 }
@@ -59,10 +62,14 @@ size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t def
 
 #if defined(DEBUG_ENABLED)
 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
+
+uint32_t gThreadId = 0u; // Only for debug
 #endif
 
 } // unnamed namespace
 
+// AsyncTaskThread
+
 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
 : mConditionalWait(),
   mAsyncTaskManager(asyncTaskManager),
@@ -112,7 +119,16 @@ bool AsyncTaskThread::Request()
 
 void AsyncTaskThread::Run()
 {
+#if defined(DEBUG_ENABLED)
+  uint32_t threadId = gThreadId++;
+  {
+    char temp[100];
+    snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
+    SetThreadName(temp);
+  }
+#else
   SetThreadName("AsyncTaskThread");
+#endif
   mLogFactory.InstallLogFunction();
 
   while(!mDestroyThread)
@@ -124,17 +140,99 @@ void AsyncTaskThread::Run()
       if(!mDestroyThread)
       {
         mIsThreadIdle = true;
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
         mConditionalWait.Wait(lock);
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
       }
     }
     else
     {
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
       task->Process();
-      mAsyncTaskManager.CompleteTask(task);
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
+      if(!mDestroyThread)
+      {
+        mAsyncTaskManager.CompleteTask(task);
+      }
     }
   }
 }
 
+// AsyncTaskManager::CacheImpl
+
+struct AsyncTaskManager::CacheImpl
+{
+  CacheImpl(AsyncTaskManager& manager)
+  : mManager(manager)
+  {
+  }
+
+public:
+  // Insert / Erase task cache API.
+
+  /**
+   * @brief Insert cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer, typename Iterator>
+  static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
+  {
+    cacheMap[task.Get()].push_back(iterator);
+  }
+
+  /**
+   * @brief Erase cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer, typename Iterator>
+  static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
+  {
+    auto mapIter = cacheMap.find(task.Get());
+    if(mapIter != cacheMap.end())
+    {
+      auto& cacheContainer = (*mapIter).second;
+      auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
+
+      if(cacheIter != cacheContainer.end())
+      {
+        cacheContainer.erase(cacheIter);
+        if(cacheContainer.empty())
+        {
+          cacheMap.erase(mapIter);
+        }
+      }
+    }
+  }
+
+  /**
+   * @brief Erase all cache that input task.
+   * @pre Mutex be locked.
+   */
+  template<typename CacheContainer>
+  static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
+  {
+    auto mapIter = cacheMap.find(task.Get());
+    if(mapIter != cacheMap.end())
+    {
+      cacheMap.erase(mapIter);
+    }
+  }
+
+public:
+  AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
+
+  // TODO : Can't we use std::set instead of std::vector?
+  // It will be slowdown If someone AddTask multiple times for same task.
+  using TaskCacheContainer        = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
+  using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+
+  TaskCacheContainer        mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
+  RunningTaskCacheContainer mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
+  TaskCacheContainer        mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
+};
+
+// AsyncTaskManager
+
 Dali::AsyncTaskManager AsyncTaskManager::Get()
 {
   Dali::AsyncTaskManager manager;
@@ -163,6 +261,8 @@ Dali::AsyncTaskManager AsyncTaskManager::Get()
 AsyncTaskManager::AsyncTaskManager()
 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
+  mWaitingHighProirityTaskCounts(0u),
+  mCacheImpl(new CacheImpl(*this)),
   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
   mProcessorRegistered(false)
 {
@@ -183,14 +283,29 @@ void AsyncTaskManager::AddTask(AsyncTaskPtr task)
   if(task)
   {
     // Lock while adding task to the queue
-    Mutex::ScopedLock lock(mMutex);
+    Mutex::ScopedLock lock(mWaitingTasksMutex);
 
-    mWaitingTasks.push_back(task);
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
 
-    // Finish all Running threads are working
-    if(mRunningTasks.size() >= mTasks.GetElementCount())
+    // push back into waiting queue.
+    auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
+    CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
+
+    if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
     {
-      return;
+      // Increase the number of waiting tasks for high priority.
+      ++mWaitingHighProirityTaskCounts;
+    }
+
+    {
+      // For thread safety
+      Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+      // Finish all Running threads are working
+      if(mRunningTasks.size() >= mTasks.GetElementCount())
+      {
+        return;
+      }
     }
   }
 
@@ -220,114 +335,241 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
 {
   if(task)
   {
-    // Lock while remove task from the queue
-    Mutex::ScopedLock lock(mMutex);
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
+
+    // Check whether we need to unregister processor.
+    // If there is some non-empty queue exist, we don't need to check unregister.
+    bool needCheckUnregisterProcessor = true;
 
-    if(!mWaitingTasks.empty())
     {
-      for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mWaitingTasksMutex);
+
+      auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mWaitingTasksCache.end())
       {
-        if((*it) && (*it) == task)
-        {
-          it = mWaitingTasks.erase(it);
-        }
-        else
+        for(auto iterator : mapIter->second)
         {
-          it++;
+          if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
+          {
+            // Decrease the number of waiting tasks for high priority.
+            --mWaitingHighProirityTaskCounts;
+          }
+          mWaitingTasks.erase(iterator);
         }
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
+      }
+
+      if(!mWaitingTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
       }
     }
 
-    if(!mRunningTasks.empty())
     {
-      for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mRunningTasksMutex);
+
+      auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mRunningTasksCache.end())
       {
-        if((*iter).first == task)
+        for(auto iterator : mapIter->second)
         {
-          (*iter).second = true;
+          // We cannot erase container. Just mark as erased.
+          // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
+          iterator->second = true;
         }
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
+      }
+
+      if(!mRunningTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
       }
     }
 
-    if(!mCompletedTasks.empty())
     {
-      for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
+      // Lock while remove task from the queue
+      Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+      auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
+      if(mapIter != mCacheImpl->mCompletedTasksCache.end())
       {
-        if((*it) && (*it) == task)
+        for(auto iterator : mapIter->second)
         {
-          it = mCompletedTasks.erase(it);
+          mCompletedTasks.erase(iterator);
         }
-        else
+        CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
+      }
+
+      if(!mCompletedTasks.empty())
+      {
+        needCheckUnregisterProcessor = false;
+      }
+    }
+
+    // UnregisterProcessor required to lock mutex. Call that API only if required.
+    if(needCheckUnregisterProcessor)
+    {
+      UnregisterProcessor();
+    }
+  }
+}
+
+AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
+{
+  // Lock while popping task out from the queue
+  Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+  if(mCompletedTasks.empty())
+  {
+    return AsyncTaskPtr();
+  }
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+
+  auto         next     = mCompletedTasks.begin();
+  AsyncTaskPtr nextTask = *next;
+  CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
+  mCompletedTasks.erase(next);
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
+
+  return nextTask;
+}
+
+void AsyncTaskManager::UnregisterProcessor()
+{
+  if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
+    // Keep processor at least 1 task exist.
+    // Please be careful the order of mutex, to avoid dead lock.
+    // TODO : Should we lock all mutex rightnow?
+    Mutex::ScopedLock lock1(mWaitingTasksMutex);
+    if(mWaitingTasks.empty())
+    {
+      Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+      if(mRunningTasks.empty())
+      {
+        Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+        if(mCompletedTasks.empty())
         {
-          it++;
+          Dali::Adaptor::Get().UnregisterProcessor(*this);
+          mProcessorRegistered = false;
         }
       }
     }
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
+  }
+}
+
+void AsyncTaskManager::TasksCompleted()
+{
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
+  while(AsyncTaskPtr task = PopNextCompletedTask())
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
+    CallbackBase::Execute(*(task->GetCompletedCallback()), task);
   }
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
 
   UnregisterProcessor();
 }
 
+void AsyncTaskManager::Process(bool postProcessor)
+{
+  TasksCompleted();
+}
+
+/// Worker thread called
 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
 {
   // Lock while popping task out from the queue
-  Mutex::ScopedLock lock(mMutex);
+  Mutex::ScopedLock lock(mWaitingTasksMutex);
+
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
 
   // pop out the next task from the queue
   AsyncTaskPtr nextTask = nullptr;
 
+  // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
+  if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
+  {
+    // For thread safety
+    Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+    if(mAvaliableLowPriorityTaskCounts == 0u)
+    {
+      // There are no avaliabe tasks to run now. Return nullptr.
+      return nextTask;
+    }
+  }
+
   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
   {
     if((*iter)->IsReady())
     {
       const auto priorityType  = (*iter)->GetPriorityType();
-      const bool taskAvaliable = (priorityType == AsyncTask::PriorityType::HIGH) || // Task always valid if it's priority is high
-                                 (mAvaliableLowPriorityTaskCounts > 0u);            // or priority is low, but we can use it.
+      bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
+      if(!taskAvaliable)
+      {
+        // For thread safety
+        Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+
+        taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
+      }
 
       if(taskAvaliable)
       {
         nextTask = *iter;
 
         // Add Running queue
-        mRunningTasks.push_back(std::make_pair(nextTask, false));
-        mWaitingTasks.erase(iter);
+        {
+          // Lock while popping task out from the queue
+          Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
 
-        // Decrease avaliable task counts if it is low priority
-        if(priorityType == AsyncTask::PriorityType::LOW)
+          DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
+
+          auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+          CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
+
+          // Decrease avaliable task counts if it is low priority
+          if(priorityType == AsyncTask::PriorityType::LOW)
+          {
+            // We are under running task mutex. We can decrease it.
+            --mAvaliableLowPriorityTaskCounts;
+          }
+        }
+
+        if(priorityType == AsyncTask::PriorityType::HIGH)
         {
-          --mAvaliableLowPriorityTaskCounts;
+          // Decrease the number of waiting tasks for high priority.
+          --mWaitingHighProirityTaskCounts;
         }
+
+        CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
+        mWaitingTasks.erase(iter);
         break;
       }
     }
   }
 
-  return nextTask;
-}
-
-AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
-{
-  // Lock while popping task out from the queue
-  Mutex::ScopedLock lock(mMutex);
-
-  if(mCompletedTasks.empty())
-  {
-    return AsyncTaskPtr();
-  }
-
-  std::vector<AsyncTaskPtr>::iterator next     = mCompletedTasks.begin();
-  AsyncTaskPtr                        nextTask = *next;
-  mCompletedTasks.erase(next);
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
 
   return nextTask;
 }
 
+/// Worker thread called
 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
 {
   // Lock while adding task to the queue
   {
-    Mutex::ScopedLock lock(mMutex);
+    Mutex::ScopedLock lock(mRunningTasksMutex);
+
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
 
+    // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
     for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
     {
       if((*iter).first == task)
@@ -336,19 +578,27 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
         {
           if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
           {
-            mCompletedTasks.push_back(task);
+            Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+
+            DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+
+            auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+            CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
           }
-        }
 
-        // Delete this task in running queue
-        mRunningTasks.erase(iter);
+          CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+        }
 
         // Increase avaliable task counts if it is low priority
         const auto priorityType = task->GetPriorityType();
         if(priorityType == AsyncTask::PriorityType::LOW)
         {
+          // We are under running task mutex. We can increase it.
           ++mAvaliableLowPriorityTaskCounts;
         }
+
+        // Delete this task in running queue
+        mRunningTasks.erase(iter);
         break;
       }
     }
@@ -357,41 +607,17 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
   // wake up the main thread
   if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
   {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
     mTrigger->Trigger();
   }
   else
   {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
   }
 }
 
-void AsyncTaskManager::UnregisterProcessor()
-{
-  if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
-  {
-    Mutex::ScopedLock lock(mMutex);
-    if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
-    {
-      Dali::Adaptor::Get().UnregisterProcessor(*this);
-      mProcessorRegistered = false;
-    }
-  }
-}
-
-void AsyncTaskManager::TasksCompleted()
-{
-  while(AsyncTaskPtr task = PopNextCompletedTask())
-  {
-    CallbackBase::Execute(*(task->GetCompletedCallback()), task);
-  }
-
-  UnregisterProcessor();
-}
-
-void AsyncTaskManager::Process(bool postProcessor)
-{
-  TasksCompleted();
-}
+// AsyncTaskManager::TaskHelper
 
 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
index 2b31789..c33f275 100644 (file)
@@ -24,6 +24,7 @@
 #include <dali/devel-api/threading/thread.h>
 #include <dali/integration-api/adaptor-framework/log-factory-interface.h>
 #include <dali/integration-api/processor-interface.h>
+#include <dali/public-api/common/list-wrapper.h>
 #include <dali/public-api/object/base-object.h>
 #include <memory>
 
@@ -117,13 +118,6 @@ public:
   void RemoveTask(AsyncTaskPtr task);
 
   /**
-   * Pop the next task out from the queue.
-   *
-   * @return The next task to be processed.
-   */
-  AsyncTaskPtr PopNextTaskToProcess();
-
-  /**
    * Pop the next task out from the completed queue, called by main thread.
    *
    * @return The next task in the completed queue.
@@ -131,13 +125,6 @@ public:
   AsyncTaskPtr PopNextCompletedTask();
 
   /**
-   * Pop the next task out from the running queue and add this task to the completed queue.
-   *
-   * @param[in] task The task added to the queue.
-   */
-  void CompleteTask(AsyncTaskPtr task);
-
-  /**
    * @brief Unregister a previously registered processor
    */
   void UnregisterProcessor();
@@ -152,6 +139,21 @@ public:
    */
   void Process(bool postProcessor) override;
 
+public: // Worker thread called method
+  /**
+   * Pop the next task out from the queue.
+   *
+   * @return The next task to be processed.
+   */
+  AsyncTaskPtr PopNextTaskToProcess();
+
+  /**
+   * Pop the next task out from the running queue and add this task to the completed queue.
+   *
+   * @param[in] task The task added to the queue.
+   */
+  void CompleteTask(AsyncTaskPtr task);
+
 private:
   /**
    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
@@ -198,18 +200,31 @@ private:
   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
 
 private:
-  std::vector<AsyncTaskPtr> mWaitingTasks;   //The queue of the tasks waiting to async process
-  std::vector<AsyncTaskPtr> mCompletedTasks; //The queue of the tasks with the async process
+  using AsyncTaskContainer = std::list<AsyncTaskPtr>;
 
-  using AsyncTaskPair = std::pair<AsyncTaskPtr, bool>;
-  std::vector<AsyncTaskPair> mRunningTasks; ///< The queue of the running tasks
+  using AsyncTaskPair             = std::pair<AsyncTaskPtr, bool>;
+  using AsyncRunningTaskContainer = std::list<AsyncTaskPair>;
+
+  AsyncTaskContainer        mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
+  AsyncRunningTaskContainer mRunningTasks;   ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
+  AsyncTaskContainer        mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
 
   RoundRobinContainerView<TaskHelper> mTasks;
 
-  uint32_t mAvaliableLowPriorityTaskCounts; ///< The number of tasks that can be processed for proirity type LOW.
+  uint32_t mAvaliableLowPriorityTaskCounts; ///< The number of tasks that can be processed for priority type LOW.
+                                            ///< Be used to select next wating task determining algorithm.
+                                            ///< Note : For thread safety, Please set/get this value under mRunningTasksMutex scope.
+  uint32_t mWaitingHighProirityTaskCounts;  ///< The number of tasks that waiting now for priority type HIGH.
                                             ///< Be used to select next wating task determining algorithm.
+                                            ///< Note : For thread safety, Please set/get this value under mWaitingTasksMutex scope.
+
+  Dali::Mutex mWaitingTasksMutex;   ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
+  Dali::Mutex mRunningTasksMutex;   ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
+  Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
+
+  struct CacheImpl;
+  std::unique_ptr<CacheImpl> mCacheImpl; ///< Cache interface for AsyncTaskManager.
 
-  Dali::Mutex                          mMutex;
   std::unique_ptr<EventThreadCallback> mTrigger;
   bool                                 mProcessorRegistered;
 };