Merge "DALi Version 2.3.2" into devel/master
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
index 6a3eaf1..1fdb733 100644 (file)
@@ -161,6 +161,338 @@ void AsyncTaskThread::Run()
   }
 }
 
+// AsyncTaskManager::TasksCompletedImpl
+
+struct AsyncTaskManager::TasksCompletedImpl
+{
+  TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
+  : mManager(manager),
+    mTrigger(trigger),
+    mEmitCompletedTaskTriggered(false)
+  {
+  }
+
+public:
+  /**
+   * @brief Create new tasks completed id and.
+   * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
+   * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
+   */
+  Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
+  {
+    // Lock while adding tasks completed callback list to the queue
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+
+    auto id = mTasksCompletedCount++;
+    DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
+
+    mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
+
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
+    return id;
+  }
+
+  /**
+   * @brief Append task that will be trace.
+   * @post RemoveTaskTrace should be called.
+   * @param[in] id The id of tasks completed.
+   * @param[in] task The task want to trace.
+   */
+  void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p]\n", id, task.Get());
+
+    // Lock while adding tasks completed callback list to the queue
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+
+    auto iter = mTasksCompletedCallbackList.find(id);
+    if(iter == mTasksCompletedCallbackList.end())
+    {
+      // This task is already erased. Ignore.
+      return;
+    }
+
+    auto& callbackData = iter->second;
+
+    auto jter = callbackData.mTasks.find(task.Get());
+
+    if(jter != callbackData.mTasks.end())
+    {
+      // Increase reference count.
+      ++(jter->second);
+    }
+    else
+    {
+      callbackData.mTasks.insert({task.Get(), 1u});
+    }
+  }
+
+  /**
+   * @brief Remove all task that were traced.
+   * @param[in] task The task want to remove trace.
+   * @param[in] taskCount The number of tasks that will be removed.
+   */
+  void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
+  {
+    if(count == 0u)
+    {
+      return;
+    }
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p] remove count[%u]\n", task.Get(), count);
+
+    // Lock while removing tasks completed callback list to the queue
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+
+    for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
+    {
+      auto& callbackData      = iter->second;
+      bool  eraseCallbackData = false;
+
+      auto jter = callbackData.mTasks.find(task.Get());
+
+      if(jter != callbackData.mTasks.end())
+      {
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p], current refcount[%u]\n", iter->first, task.Get(), (jter->second));
+
+        if(jter->second <= count)
+        {
+          callbackData.mTasks.erase(jter);
+
+          DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
+
+          if(callbackData.mTasks.empty())
+          {
+            eraseCallbackData = true;
+
+            // Move callback base into list.
+            // (To avoid task container changed during callback emit)
+            RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
+
+            DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
+
+            iter = mTasksCompletedCallbackList.erase(iter);
+          }
+        }
+        else
+        {
+          jter->second -= count;
+        }
+      }
+
+      if(!eraseCallbackData)
+      {
+        ++iter;
+      }
+    }
+  }
+
+  /**
+   * @brief Check whether current TasksCompletedId completed or not.
+   * @param[in] id The id of tasks completed.
+   * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
+   */
+  bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
+
+    // Lock while removing tasks completed callback list to the queue
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+
+    auto iter = mTasksCompletedCallbackList.find(id);
+    if(iter != mTasksCompletedCallbackList.end())
+    {
+      auto& callbackData = iter->second;
+      if(callbackData.mTasks.empty())
+      {
+        // Move callback base into list.
+        // (To avoid task container changed during callback emit)
+        RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
+
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
+
+        iter = mTasksCompletedCallbackList.erase(iter);
+
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * @brief Remove taskS completed callbacks by id.
+   * @param[in] id The id of taskS completed.
+   * @return True if taskS completed id removed. False otherwise.
+   */
+  bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
+
+    // Lock while removing taskS completed callback list to the queue
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+
+    auto iter = mTasksCompletedCallbackList.find(id);
+    if(iter == mTasksCompletedCallbackList.end())
+    {
+      // This task is already erased, or completed.
+      // Erase from completed excute callback list.
+
+      // Lock while removing excute callback list to the queue
+      Mutex::ScopedLock lock(mExcuteCallbacksMutex);
+
+      for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
+      {
+        if(iter->second == id)
+        {
+          iter = mExcuteCallbackList.erase(iter);
+
+          return true;
+        }
+        else
+        {
+          ++iter;
+        }
+      }
+
+      // This task is alread erased and completed. Ignore.
+      return false;
+    }
+
+    mTasksCompletedCallbackList.erase(iter);
+
+    return true;
+  }
+
+  /**
+   * @brief Emit all completed callbacks.
+   * @note This API should be called at event thread.
+   */
+  void EmitCompletedTasks()
+  {
+    ExecuteCallbackContainer executeCallbackList;
+    {
+      // Lock while removing excute callback list to the queue
+      Mutex::ScopedLock lock(mExcuteCallbacksMutex);
+
+      mEmitCompletedTaskTriggered = false;
+
+      // Copy callback lists, for let we execute callbacks out of mutex
+      executeCallbackList = std::move(mExcuteCallbackList);
+      mExcuteCallbackList.clear();
+    }
+
+    if(!executeCallbackList.empty())
+    {
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
+      // Execute all callbacks
+      for(auto&& callbackPair : executeCallbackList)
+      {
+        auto& callback = callbackPair.first;
+        auto  id       = callbackPair.second;
+
+        DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
+
+        Dali::CallbackBase::Execute(*callback, id);
+      }
+
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
+    }
+  }
+
+  /**
+   * @brief Check whether there is some completed signal what we need to trace, or not.
+   * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
+   */
+  bool IsTasksCompletedCallbackExist()
+  {
+    Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
+    return !mTasksCompletedCallbackList.empty();
+  }
+
+  /**
+   * @brief Check whether there is some completed signal what we need to execute, or not.
+   * @return True if mExcuteCallbackList is not empty. False otherwise.
+   */
+  bool IsExecuteCallbackExist()
+  {
+    Mutex::ScopedLock lock(mExcuteCallbacksMutex);
+    return !mExcuteCallbackList.empty();
+  }
+
+private:
+  void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
+
+    // Lock while adding excute callback list to the queue
+    Mutex::ScopedLock lock(mExcuteCallbacksMutex);
+
+    mExcuteCallbackList.emplace_back(std::move(callback), id);
+
+    if(!mEmitCompletedTaskTriggered)
+    {
+      mEmitCompletedTaskTriggered = true;
+
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
+      mTrigger->Trigger();
+    }
+  }
+
+private:
+  struct CallbackData
+  {
+  public:
+    CallbackData(CallbackBase* callback)
+    : mCallback(callback),
+      mTasks()
+    {
+    }
+
+    CallbackData(CallbackData&& rhs) noexcept
+    : mCallback(std::move(rhs.mCallback)),
+      mTasks(std::move(rhs.mTasks))
+    {
+    }
+
+    CallbackData& operator=(CallbackData&& rhs) noexcept
+    {
+      if(this != &rhs)
+      {
+        mCallback = std::move(rhs.mCallback);
+        mTasks    = std::move(rhs.mTasks);
+      }
+
+      return *this;
+    }
+
+  private:
+    // Delete copy operator.
+    CallbackData(const CallbackData& rhs) = delete;
+    CallbackData& operator=(const CallbackData& rhs) = delete;
+
+  public:
+    std::unique_ptr<CallbackBase>                  mCallback;
+    std::unordered_map<const AsyncTask*, uint32_t> mTasks;
+  };
+
+private:
+  AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
+  EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
+
+  Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
+
+  using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
+  TasksCompletedContainer mTasksCompletedCallbackList;
+
+  using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
+  ExecuteCallbackContainer mExcuteCallbackList;
+
+  Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
+  Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
+
+  bool mEmitCompletedTaskTriggered : 1;
+};
+
 // AsyncTaskManager::CacheImpl
 
 struct AsyncTaskManager::CacheImpl
@@ -266,8 +598,9 @@ AsyncTaskManager::AsyncTaskManager()
 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
   mWaitingHighProirityTaskCounts(0u),
-  mCacheImpl(new CacheImpl(*this)),
   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
+  mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
+  mCacheImpl(new CacheImpl(*this)),
   mProcessorRegistered(false)
 {
 }
@@ -283,7 +616,8 @@ AsyncTaskManager::~AsyncTaskManager()
   // Join all threads.
   mTasks.Clear();
 
-  // Remove cache impl after all threads are join.
+  // Remove task completed impl and cache impl after all threads are join.
+  mTasksCompletedImpl.reset();
   mCacheImpl.reset();
 
   // Remove tasks after CacheImpl removed
@@ -337,11 +671,7 @@ void AsyncTaskManager::AddTask(AsyncTaskPtr task)
   }
 
   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
-  if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
-  {
-    Dali::Adaptor::Get().RegisterProcessor(*this);
-    mProcessorRegistered = true;
-  }
+  RegisterProcessor();
 
   return;
 }
@@ -356,6 +686,8 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
     // If there is some non-empty queue exist, we don't need to unregister processor.
     bool needCheckUnregisterProcessor = true;
 
+    uint32_t removedCount = 0u;
+
     {
       // Lock while remove task from the queue
       Mutex::ScopedLock lock(mWaitingTasksMutex);
@@ -372,6 +704,7 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
             --mWaitingHighProirityTaskCounts;
           }
           mWaitingTasks.erase(iterator);
+          ++removedCount;
         }
         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
       }
@@ -394,7 +727,11 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
           DALI_ASSERT_DEBUG((*iterator).first == task);
           // We cannot erase container. Just mark as canceled.
           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
-          (*iterator).second = RunningTaskState::CANCELED;
+          if((*iterator).second == RunningTaskState::RUNNING)
+          {
+            (*iterator).second = RunningTaskState::CANCELED;
+            ++removedCount;
+          }
         }
       }
 
@@ -413,7 +750,11 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       {
         for(auto& iterator : mapIter->second)
         {
-          DALI_ASSERT_DEBUG(iterator->first == task);
+          DALI_ASSERT_DEBUG((*iterator).first == task);
+          if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
+          {
+            ++removedCount;
+          }
           mCompletedTasks.erase(iterator);
         }
         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
@@ -425,6 +766,12 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       }
     }
 
+    // Remove TasksCompleted callback trace
+    if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
+    {
+      mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
+    }
+
     // UnregisterProcessor required to lock mutex. Call this API only if required.
     if(needCheckUnregisterProcessor)
     {
@@ -433,37 +780,139 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
   }
 }
 
-AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
+Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
 {
-  // Lock while popping task out from the queue
-  Mutex::ScopedLock lock(mCompletedTasksMutex);
+  // mTasksCompletedImpl will take ownership of callback.
+  Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
 
-  AsyncTaskPtr nextCompletedTask = nullptr;
+  bool taskAdded = false; ///< Flag whether at least one task tracing now.
 
-  while(!mCompletedTasks.empty())
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
+
+  // Please be careful the order of mutex, to avoid dead lock.
   {
-    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+    Mutex::ScopedLock lockWait(mWaitingTasksMutex);
+    {
+      Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+      {
+        Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
 
-    auto               next      = mCompletedTasks.begin();
-    AsyncTaskPtr       nextTask  = next->first;
-    CompletedTaskState taskState = next->second;
-    CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
-    mCompletedTasks.erase(next);
+        // Collect all tasks from waiting tasks
+        for(auto& task : mWaitingTasks)
+        {
+          auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
+                           (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
 
-    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
+          if((checkMask & mask) == checkMask)
+          {
+            taskAdded = true;
+            mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
+          }
+        }
 
-    if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
-    {
-      nextCompletedTask = nextTask;
-      break;
+        // Collect all tasks from running tasks
+        for(auto& taskPair : mRunningTasks)
+        {
+          // Trace only if it is running now.
+          if(taskPair.second == RunningTaskState::RUNNING)
+          {
+            auto& task      = taskPair.first;
+            auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
+                             (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
+
+            if((checkMask & mask) == checkMask)
+            {
+              taskAdded = true;
+              mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
+            }
+          }
+        }
+
+        // Collect all tasks from complete tasks
+        for(auto& taskPair : mCompletedTasks)
+        {
+          // Trace only if it is need callback.
+          // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
+          //        If worker thread invocation, than it already remove trace at completed timing.
+          //        If canceled cases, we don't append trace at running tasks already.
+          //        So, we don't need to trace for SKIP_CALLBACK cases.
+          if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
+          {
+            auto& task      = taskPair.first;
+            auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
+                             (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
+
+            if((checkMask & mask) == checkMask)
+            {
+              taskAdded = true;
+              mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
+            }
+          }
+        }
+      }
     }
   }
 
-  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
+  // If there is nothing to check task, just excute callback right now.
+  if(!taskAdded)
+  {
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
+
+    mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
+  }
+  return tasksCompletedId;
+}
+
+bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
+{
+  return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
+}
+
+AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
+{
+  std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
+
+  AsyncTaskPtr nextCompletedTask = nullptr;
+  {
+    // Lock while popping task out from the queue
+    Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+    while(!mCompletedTasks.empty())
+    {
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+
+      auto               next      = mCompletedTasks.begin();
+      AsyncTaskPtr       nextTask  = next->first;
+      CompletedTaskState taskState = next->second;
+      CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
+      mCompletedTasks.erase(next);
+
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
+
+      if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
+      {
+        nextCompletedTask = nextTask;
+        break;
+      }
+
+      ignoredTaskList.push_back(nextTask);
+    }
+
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
+  }
 
   return nextCompletedTask;
 }
 
+void AsyncTaskManager::RegisterProcessor()
+{
+  if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
+  {
+    Dali::Adaptor::Get().RegisterProcessor(*this);
+    mProcessorRegistered = true;
+  }
+}
+
 void AsyncTaskManager::UnregisterProcessor()
 {
   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
@@ -497,10 +946,18 @@ void AsyncTaskManager::TasksCompleted()
   {
     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+
+    // Remove TasksCompleted callback trace
+    if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
+    {
+      mTasksCompletedImpl->RemoveTaskTrace(task);
+    }
   }
 
   UnregisterProcessor();
   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
+
+  mTasksCompletedImpl->EmitCompletedTasks();
 }
 
 void AsyncTaskManager::Process(bool postProcessor)
@@ -546,6 +1003,22 @@ AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
       }
 
+      // Check whether we try to running same task at multiple threads.
+      if(taskAvaliable)
+      {
+        Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+        auto              mapIter = mCacheImpl->mRunningTasksCache.find((*iter).Get());
+        if(mapIter != mCacheImpl->mRunningTasksCache.end())
+        {
+          if(!mapIter->second.empty())
+          {
+            // Some other thread running this tasks now. Ignore it.
+            DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Some other thread running this task [%p]\n", (*iter).Get());
+            taskAvaliable = false;
+          }
+        }
+      }
+
       if(taskAvaliable)
       {
         nextTask = *iter;
@@ -593,7 +1066,7 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
 
   if(task)
   {
-    bool needTrigger = (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
+    bool needTrigger = false;
 
     // Lock while check validation of task.
     {
@@ -622,6 +1095,18 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
     {
       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+
+      // We need to remove task trace now.
+      if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
+      {
+        mTasksCompletedImpl->RemoveTaskTrace(task);
+
+        if(mTasksCompletedImpl->IsExecuteCallbackExist())
+        {
+          // We need to call EmitCompletedTasks(). Trigger main thread.
+          needTrigger = true;
+        }
+      }
     }
 
     // Lock while adding task to the queue
@@ -649,6 +1134,8 @@ void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
 
           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
 
+          needTrigger |= callbackRequired;
+
           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
 
           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));