#include <dali/devel-api/common/singleton-service.h>
#include <dali/integration-api/adaptor-framework/adaptor.h>
#include <dali/integration-api/debug.h>
-#include <dali/public-api/common/vector-wrapper.h>
#include <unordered_map>
auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
constexpr auto MAX_NUMBER_OF_THREADS = 16u;
- DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
- return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
+ DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
+ return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
}
size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
template<typename CacheContainer, typename Iterator>
static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
{
- cacheMap[task.Get()].push_back(iterator);
+ auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
+ cacheContainer.insert(cacheContainer.end(), iterator);
}
/**
public:
AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
- // TODO : Can't we use std::set instead of std::vector?
- // It will be slowdown If someone AddTask multiple times for same task.
- using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
- using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+ // Keep cache iterators as list since we take tasks by FIFO as default.
+ using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
+ using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
{
if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
+ mProcessorRegistered = false;
Dali::Adaptor::Get().UnregisterProcessor(*this);
}
// If all threads are busy, then it's ok just to push the task because they will try to get the next job.
}
+ // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
Dali::Adaptor::Get().RegisterProcessor(*this);
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
// Check whether we need to unregister processor.
- // If there is some non-empty queue exist, we don't need to check unregister.
+ // If there is some non-empty queue exist, we don't need to unregister processor.
bool needCheckUnregisterProcessor = true;
{
{
for(auto iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG((*iterator) == task);
if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
{
// Decrease the number of waiting tasks for high priority.
{
for(auto iterator : mapIter->second)
{
- // We cannot erase container. Just mark as erased.
+ DALI_ASSERT_DEBUG((*iterator).first == task);
+ // We cannot erase container. Just mark as canceled.
// Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
- iterator->second = true;
+ (*iterator).second = RunningTaskState::CANCELED;
}
- CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
}
if(!mRunningTasks.empty())
{
for(auto iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG((*iterator) == task);
mCompletedTasks.erase(iterator);
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
}
}
- // UnregisterProcessor required to lock mutex. Call that API only if required.
+ // UnregisterProcessor required to lock mutex. Call this API only if required.
if(needCheckUnregisterProcessor)
{
UnregisterProcessor();
// Keep processor at least 1 task exist.
// Please be careful the order of mutex, to avoid dead lock.
// TODO : Should we lock all mutex rightnow?
- Mutex::ScopedLock lock1(mWaitingTasksMutex);
+ Mutex::ScopedLock lockWait(mWaitingTasksMutex);
if(mWaitingTasks.empty())
{
- Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+ Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
if(mRunningTasks.empty())
{
- Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+ Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
if(mCompletedTasks.empty())
{
- Dali::Adaptor::Get().UnregisterProcessor(*this);
mProcessorRegistered = false;
+ Dali::Adaptor::Get().UnregisterProcessor(*this);
}
}
}
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
UnregisterProcessor();
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
}
void AsyncTaskManager::Process(bool postProcessor)
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
- auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+ auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
// Decrease avaliable task counts if it is low priority
/// Worker thread called
void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
{
+ bool notify = false;
+
// Lock while adding task to the queue
+ if(task)
{
Mutex::ScopedLock lock(mRunningTasksMutex);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
-
- // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
- for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
{
- if((*iter).first == task)
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+ const auto iter = *cacheIter;
+ DALI_ASSERT_DEBUG(iter->first == task);
+ if(iter->second == RunningTaskState::RUNNING)
{
- if(!(*iter).second)
- {
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
- {
- Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+ // This task is valid.
+ notify = true;
+ }
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+ const auto priorityType = iter->first->GetPriorityType();
+ // Increase avaliable task counts if it is low priority
+ if(priorityType == AsyncTask::PriorityType::LOW)
+ {
+ // We are under running task mutex. We can increase it.
+ ++mAvaliableLowPriorityTaskCounts;
+ }
+ CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+ mRunningTasks.erase(iter);
+ }
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
- CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
- }
+ // We should move the task to compeleted task under mRunningTaskMutex.
+ if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ {
+ Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
- // Increase avaliable task counts if it is low priority
- const auto priorityType = task->GetPriorityType();
- if(priorityType == AsyncTask::PriorityType::LOW)
- {
- // We are under running task mutex. We can increase it.
- ++mAvaliableLowPriorityTaskCounts;
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
- // Delete this task in running queue
- mRunningTasks.erase(iter);
- break;
- }
+ auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+ CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
}
}
- // wake up the main thread
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ // We should execute this tasks complete callback out of mutex
+ if(notify)
{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
- mTrigger->Trigger();
- }
- else
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
- CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ {
+ // wake up the main thread
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
+ mTrigger->Trigger();
+ }
+ else // task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD
+ {
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
+ CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ }
}
}