{
namespace
{
+constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
+
constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
: mConditionalWait(),
mAsyncTaskManager(asyncTaskManager),
mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
+ mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
mDestroyThread(false),
mIsThreadStarted(false),
mIsThreadIdle(true)
SetThreadName("AsyncTaskThread");
#endif
mLogFactory.InstallLogFunction();
+ mTraceFactory.InstallTraceFunction();
while(!mDestroyThread)
{
Dali::Adaptor::Get().UnregisterProcessor(*this);
}
+ // Join all threads.
mTasks.Clear();
+
+ // Remove cache impl after all threads are join.
+ mCacheImpl.reset();
+
+ // Remove tasks after CacheImpl removed
+ mWaitingTasks.clear();
+ mRunningTasks.clear();
+ mCompletedTasks.clear();
}
void AsyncTaskManager::AddTask(AsyncTaskPtr task)
auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
+ CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
+ mWaitingTasks.erase(iter);
+
// Decrease avaliable task counts if it is low priority
if(priorityType == AsyncTask::PriorityType::LOW)
{
// Decrease the number of waiting tasks for high priority.
--mWaitingHighProirityTaskCounts;
}
-
- CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
- mWaitingTasks.erase(iter);
break;
}
}
if(task)
{
- // Lock while adding task to the queue
+ bool needTrigger = (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
+
+ // Lock while check validation of task.
{
Mutex::ScopedLock lock(mRunningTasksMutex);
// This task is valid.
notify = true;
}
-
- const auto priorityType = iter->first->GetPriorityType();
- // Increase avaliable task counts if it is low priority
- if(priorityType == AsyncTask::PriorityType::LOW)
- {
- // We are under running task mutex. We can increase it.
- ++mAvaliableLowPriorityTaskCounts;
- }
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
- mRunningTasks.erase(iter);
}
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- const bool needTrigger = task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD;
-
- // Move task into completed, for ensure that AsyncTask destroy at main thread.
+ // Lock while adding task to the queue
{
- Mutex::ScopedLock lock(mCompletedTasksMutex);
+ Mutex::ScopedLock lock(mRunningTasksMutex);
- const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
+ {
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+ const auto iter = *cacheIter;
+ const auto priorityType = iter->first->GetPriorityType();
+ // Increase avaliable task counts if it is low priority
+ if(priorityType == AsyncTask::PriorityType::LOW)
+ {
+ // We are under running task mutex. We can increase it.
+ ++mAvaliableLowPriorityTaskCounts;
+ }
+
+ // Move task into completed, for ensure that AsyncTask destroy at main thread.
+ {
+ Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+
+ const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
+
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
+ auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
+ CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
- CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
+ CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+ mRunningTasks.erase(iter);
- // Now, task is invalidate.
- task.Reset();
+ if(!needTrigger)
+ {
+ needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
+ }
+
+ // Now, task is invalidate.
+ task.Reset();
+ }
+ }
}
// Wake up the main thread