#include <dali/devel-api/common/singleton-service.h>
#include <dali/integration-api/adaptor-framework/adaptor.h>
#include <dali/integration-api/debug.h>
-#include <dali/public-api/common/vector-wrapper.h>
#include <unordered_map>
{
namespace
{
+constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
+
constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
constexpr auto MAX_NUMBER_OF_THREADS = 16u;
- DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
- return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
+ DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
+ return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
}
size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
: mConditionalWait(),
mAsyncTaskManager(asyncTaskManager),
mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
+ mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
mDestroyThread(false),
mIsThreadStarted(false),
mIsThreadIdle(true)
SetThreadName("AsyncTaskThread");
#endif
mLogFactory.InstallLogFunction();
+ mTraceFactory.InstallTraceFunction();
while(!mDestroyThread)
{
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
if(!mDestroyThread)
{
- mAsyncTaskManager.CompleteTask(task);
+ mAsyncTaskManager.CompleteTask(std::move(task));
}
}
}
template<typename CacheContainer, typename Iterator>
static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
{
- cacheMap[task.Get()].push_back(iterator);
+ auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
+ cacheContainer.insert(cacheContainer.end(), iterator);
}
/**
public:
AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
- // TODO : Can't we use std::set instead of std::vector?
- // It will be slowdown If someone AddTask multiple times for same task.
- using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
- using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+ // Keep cache iterators as list since we take tasks by FIFO as default.
+ using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
+ using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
+ using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
- TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
- RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
- TaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
+ TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
+ RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
+ CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
};
// AsyncTaskManager
{
if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
+ mProcessorRegistered = false;
Dali::Adaptor::Get().UnregisterProcessor(*this);
}
+ // Join all threads.
mTasks.Clear();
+
+ // Remove cache impl after all threads are join.
+ mCacheImpl.reset();
+
+ // Remove tasks after CacheImpl removed
+ mWaitingTasks.clear();
+ mRunningTasks.clear();
+ mCompletedTasks.clear();
}
void AsyncTaskManager::AddTask(AsyncTaskPtr task)
// If all threads are busy, then it's ok just to push the task because they will try to get the next job.
}
+ // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
Dali::Adaptor::Get().RegisterProcessor(*this);
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
// Check whether we need to unregister processor.
- // If there is some non-empty queue exist, we don't need to check unregister.
+ // If there is some non-empty queue exist, we don't need to unregister processor.
bool needCheckUnregisterProcessor = true;
{
auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
if(mapIter != mCacheImpl->mWaitingTasksCache.end())
{
- for(auto iterator : mapIter->second)
+ for(auto& iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG((*iterator) == task);
if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
{
// Decrease the number of waiting tasks for high priority.
auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
if(mapIter != mCacheImpl->mRunningTasksCache.end())
{
- for(auto iterator : mapIter->second)
+ for(auto& iterator : mapIter->second)
{
- // We cannot erase container. Just mark as erased.
+ DALI_ASSERT_DEBUG((*iterator).first == task);
+ // We cannot erase container. Just mark as canceled.
// Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
- iterator->second = true;
+ (*iterator).second = RunningTaskState::CANCELED;
}
- CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
}
if(!mRunningTasks.empty())
auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
if(mapIter != mCacheImpl->mCompletedTasksCache.end())
{
- for(auto iterator : mapIter->second)
+ for(auto& iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG(iterator->first == task);
mCompletedTasks.erase(iterator);
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
}
}
- // UnregisterProcessor required to lock mutex. Call that API only if required.
+ // UnregisterProcessor required to lock mutex. Call this API only if required.
if(needCheckUnregisterProcessor)
{
UnregisterProcessor();
// Lock while popping task out from the queue
Mutex::ScopedLock lock(mCompletedTasksMutex);
- if(mCompletedTasks.empty())
+ AsyncTaskPtr nextCompletedTask = nullptr;
+
+ while(!mCompletedTasks.empty())
{
- return AsyncTaskPtr();
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
+ auto next = mCompletedTasks.begin();
+ AsyncTaskPtr nextTask = next->first;
+ CompletedTaskState taskState = next->second;
+ CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
+ mCompletedTasks.erase(next);
- auto next = mCompletedTasks.begin();
- AsyncTaskPtr nextTask = *next;
- CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
- mCompletedTasks.erase(next);
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
+ if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
+ {
+ nextCompletedTask = nextTask;
+ break;
+ }
+ }
- return nextTask;
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
+
+ return nextCompletedTask;
}
void AsyncTaskManager::UnregisterProcessor()
// Keep processor at least 1 task exist.
// Please be careful the order of mutex, to avoid dead lock.
// TODO : Should we lock all mutex rightnow?
- Mutex::ScopedLock lock1(mWaitingTasksMutex);
+ Mutex::ScopedLock lockWait(mWaitingTasksMutex);
if(mWaitingTasks.empty())
{
- Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+ Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
if(mRunningTasks.empty())
{
- Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+ Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
if(mCompletedTasks.empty())
{
- Dali::Adaptor::Get().UnregisterProcessor(*this);
mProcessorRegistered = false;
+ Dali::Adaptor::Get().UnregisterProcessor(*this);
}
}
}
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
UnregisterProcessor();
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
}
void AsyncTaskManager::Process(bool postProcessor)
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
- auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+ auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
+ CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
+ mWaitingTasks.erase(iter);
+
// Decrease avaliable task counts if it is low priority
if(priorityType == AsyncTask::PriorityType::LOW)
{
// Decrease the number of waiting tasks for high priority.
--mWaitingHighProirityTaskCounts;
}
-
- CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
- mWaitingTasks.erase(iter);
break;
}
}
}
/// Worker thread called
-void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
+void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
{
- // Lock while adding task to the queue
- {
- Mutex::ScopedLock lock(mRunningTasksMutex);
+ bool notify = false;
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
+ if(task)
+ {
+ bool needTrigger = (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
- // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
- for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+ // Lock while check validation of task.
{
- if((*iter).first == task)
+ Mutex::ScopedLock lock(mRunningTasksMutex);
+
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
{
- if(!(*iter).second)
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+ const auto iter = *cacheIter;
+ DALI_ASSERT_DEBUG(iter->first == task);
+ if(iter->second == RunningTaskState::RUNNING)
{
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
- {
- Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+ // This task is valid.
+ notify = true;
+ }
+ }
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
+ }
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
- CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
- }
+ // We should execute this tasks complete callback out of mutex
+ if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
+ {
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
+ CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ }
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
- }
+ // Lock while adding task to the queue
+ {
+ Mutex::ScopedLock lock(mRunningTasksMutex);
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
+ {
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+ const auto iter = *cacheIter;
+ const auto priorityType = iter->first->GetPriorityType();
// Increase avaliable task counts if it is low priority
- const auto priorityType = task->GetPriorityType();
if(priorityType == AsyncTask::PriorityType::LOW)
{
// We are under running task mutex. We can increase it.
++mAvaliableLowPriorityTaskCounts;
}
- // Delete this task in running queue
- mRunningTasks.erase(iter);
- break;
+ // Move task into completed, for ensure that AsyncTask destroy at main thread.
+ {
+ Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+
+ const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
+
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
+
+ auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
+ CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
+
+ CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+ mRunningTasks.erase(iter);
+
+ if(!needTrigger)
+ {
+ needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
+ }
+
+ // Now, task is invalidate.
+ task.Reset();
+ }
}
}
- }
- // wake up the main thread
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
- mTrigger->Trigger();
- }
- else
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
- CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ // Wake up the main thread
+ if(needTrigger)
+ {
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
+ mTrigger->Trigger();
+ }
}
}