DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
if(!mDestroyThread)
{
- mAsyncTaskManager.CompleteTask(task);
+ mAsyncTaskManager.CompleteTask(std::move(task));
}
}
}
AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
// Keep cache iterators as list since we take tasks by FIFO as default.
- using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
- using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
+ using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
+ using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
+ using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
- TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
- RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
- TaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
+ TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
+ RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
+ CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
};
// AsyncTaskManager
{
for(auto& iterator : mapIter->second)
{
- DALI_ASSERT_DEBUG((*iterator) == task);
+ DALI_ASSERT_DEBUG(iterator->first == task);
mCompletedTasks.erase(iterator);
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
// Lock while popping task out from the queue
Mutex::ScopedLock lock(mCompletedTasksMutex);
- if(mCompletedTasks.empty())
+ AsyncTaskPtr nextCompletedTask = nullptr;
+
+ while(!mCompletedTasks.empty())
{
- return AsyncTaskPtr();
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+ auto next = mCompletedTasks.begin();
+ AsyncTaskPtr nextTask = next->first;
+ CompletedTaskState taskState = next->second;
+ CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
+ mCompletedTasks.erase(next);
- auto next = mCompletedTasks.begin();
- AsyncTaskPtr nextTask = *next;
- CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
- mCompletedTasks.erase(next);
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextCompletedTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
+ if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
+ {
+ nextCompletedTask = nextTask;
+ break;
+ }
+ }
- return nextTask;
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
+
+ return nextCompletedTask;
}
void AsyncTaskManager::UnregisterProcessor()
}
/// Worker thread called
-void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
+void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
{
bool notify = false;
- // Lock while adding task to the queue
if(task)
{
- Mutex::ScopedLock lock(mRunningTasksMutex);
-
- auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
- if(mapIter != mCacheImpl->mRunningTasksCache.end())
+ // Lock while adding task to the queue
{
- const auto cacheIter = mapIter->second.begin();
- DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+ Mutex::ScopedLock lock(mRunningTasksMutex);
- const auto iter = *cacheIter;
- DALI_ASSERT_DEBUG(iter->first == task);
- if(iter->second == RunningTaskState::RUNNING)
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
{
- // This task is valid.
- notify = true;
- }
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
- const auto priorityType = iter->first->GetPriorityType();
- // Increase avaliable task counts if it is low priority
- if(priorityType == AsyncTask::PriorityType::LOW)
- {
- // We are under running task mutex. We can increase it.
- ++mAvaliableLowPriorityTaskCounts;
+ const auto iter = *cacheIter;
+ DALI_ASSERT_DEBUG(iter->first == task);
+ if(iter->second == RunningTaskState::RUNNING)
+ {
+ // This task is valid.
+ notify = true;
+ }
+
+ const auto priorityType = iter->first->GetPriorityType();
+ // Increase avaliable task counts if it is low priority
+ if(priorityType == AsyncTask::PriorityType::LOW)
+ {
+ // We are under running task mutex. We can increase it.
+ ++mAvaliableLowPriorityTaskCounts;
+ }
+ CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+ mRunningTasks.erase(iter);
}
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
- mRunningTasks.erase(iter);
+
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
+ }
+
+ // We should execute this tasks complete callback out of mutex
+ if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
+ {
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
+ CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
+ const bool needTrigger = task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD;
- // We should move the task to compeleted task under mRunningTaskMutex.
- if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ // Move task into completed, for ensure that AsyncTask destroy at main thread.
{
- Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+ Mutex::ScopedLock lock(mCompletedTasksMutex);
+
+ const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+ auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
+
+ // Now, task is invalidate.
+ task.Reset();
}
- }
- // We should execute this tasks complete callback out of mutex
- if(notify)
- {
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ // Wake up the main thread
+ if(needTrigger)
{
- // wake up the main thread
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
mTrigger->Trigger();
}
- else // task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
- CallbackBase::Execute(*(task->GetCompletedCallback()), task);
- }
}
}