#include <dali/devel-api/common/singleton-service.h>
#include <dali/integration-api/adaptor-framework/adaptor.h>
#include <dali/integration-api/debug.h>
-#include <dali/public-api/common/vector-wrapper.h>
-
-#include <unordered_map>
namespace Dali
{
{
auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
- constexpr auto MAX_NUMBER_OF_THREADS = 16u;
+ constexpr auto MAX_NUMBER_OF_THREADS = 10u;
DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
}
#if defined(DEBUG_ENABLED)
Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
-
-uint32_t gThreadId = 0u; // Only for debug
#endif
} // unnamed namespace
-// AsyncTaskThread
-
AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
: mConditionalWait(),
mAsyncTaskManager(asyncTaskManager),
void AsyncTaskThread::Run()
{
-#if defined(DEBUG_ENABLED)
- uint32_t threadId = gThreadId++;
- {
- char temp[100];
- snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
- SetThreadName(temp);
- }
-#else
SetThreadName("AsyncTaskThread");
-#endif
mLogFactory.InstallLogFunction();
while(!mDestroyThread)
if(!mDestroyThread)
{
mIsThreadIdle = true;
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
mConditionalWait.Wait(lock);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
}
}
else
{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
task->Process();
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
- if(!mDestroyThread)
- {
- mAsyncTaskManager.CompleteTask(task);
- }
+ mAsyncTaskManager.CompleteTask(task);
}
}
}
-// AsyncTaskManager::CacheImpl
-
-struct AsyncTaskManager::CacheImpl
-{
- CacheImpl(AsyncTaskManager& manager)
- : mManager(manager)
- {
- }
-
-public:
- // Insert / Erase task cache API.
-
- /**
- * @brief Insert cache that input task.
- * @pre Mutex be locked.
- */
- template<typename CacheContainer, typename Iterator>
- static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
- {
- cacheMap[task.Get()].push_back(iterator);
- }
-
- /**
- * @brief Erase cache that input task.
- * @pre Mutex be locked.
- */
- template<typename CacheContainer, typename Iterator>
- static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
- {
- auto mapIter = cacheMap.find(task.Get());
- if(mapIter != cacheMap.end())
- {
- auto& cacheContainer = (*mapIter).second;
- auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
-
- if(cacheIter != cacheContainer.end())
- {
- cacheContainer.erase(cacheIter);
- if(cacheContainer.empty())
- {
- cacheMap.erase(mapIter);
- }
- }
- }
- }
-
- /**
- * @brief Erase all cache that input task.
- * @pre Mutex be locked.
- */
- template<typename CacheContainer>
- static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
- {
- auto mapIter = cacheMap.find(task.Get());
- if(mapIter != cacheMap.end())
- {
- cacheMap.erase(mapIter);
- }
- }
-
-public:
- AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
-
- // TODO : Can't we use std::set instead of std::vector?
- // It will be slowdown If someone AddTask multiple times for same task.
- using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
- using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
-
- TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
- RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
- TaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
-};
-
-// AsyncTaskManager
-
Dali::AsyncTaskManager AsyncTaskManager::Get()
{
Dali::AsyncTaskManager manager;
AsyncTaskManager::AsyncTaskManager()
: mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
- mWaitingHighProirityTaskCounts(0u),
- mCacheImpl(new CacheImpl(*this)),
mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
mProcessorRegistered(false)
{
if(task)
{
// Lock while adding task to the queue
- Mutex::ScopedLock lock(mWaitingTasksMutex);
+ Mutex::ScopedLock lock(mMutex);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
+ mWaitingTasks.push_back(task);
- // push back into waiting queue.
- auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
- CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
-
- if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
+ // Finish all Running threads are working
+ if(mRunningTasks.size() >= mTasks.GetElementCount())
{
- // Increase the number of waiting tasks for high priority.
- ++mWaitingHighProirityTaskCounts;
- }
-
- {
- // For thread safety
- Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
-
- // Finish all Running threads are working
- if(mRunningTasks.size() >= mTasks.GetElementCount())
- {
- return;
- }
+ return;
}
}
{
if(task)
{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
-
- // Check whether we need to unregister processor.
- // If there is some non-empty queue exist, we don't need to check unregister.
- bool needCheckUnregisterProcessor = true;
+ // Lock while remove task from the queue
+ Mutex::ScopedLock lock(mMutex);
+ if(!mWaitingTasks.empty())
{
- // Lock while remove task from the queue
- Mutex::ScopedLock lock(mWaitingTasksMutex);
-
- auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
- if(mapIter != mCacheImpl->mWaitingTasksCache.end())
+ for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
{
- for(auto iterator : mapIter->second)
+ if((*it) && (*it) == task)
{
- if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
- {
- // Decrease the number of waiting tasks for high priority.
- --mWaitingHighProirityTaskCounts;
- }
- mWaitingTasks.erase(iterator);
+ it = mWaitingTasks.erase(it);
}
- CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
- }
-
- if(!mWaitingTasks.empty())
- {
- needCheckUnregisterProcessor = false;
- }
- }
-
- {
- // Lock while remove task from the queue
- Mutex::ScopedLock lock(mRunningTasksMutex);
-
- auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
- if(mapIter != mCacheImpl->mRunningTasksCache.end())
- {
- for(auto iterator : mapIter->second)
+ else
{
- // We cannot erase container. Just mark as erased.
- // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
- iterator->second = true;
+ it++;
}
- CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
- }
-
- if(!mRunningTasks.empty())
- {
- needCheckUnregisterProcessor = false;
}
}
+ if(!mRunningTasks.empty())
{
- // Lock while remove task from the queue
- Mutex::ScopedLock lock(mCompletedTasksMutex);
-
- auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
- if(mapIter != mCacheImpl->mCompletedTasksCache.end())
+ for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
{
- for(auto iterator : mapIter->second)
+ if((*iter).first == task)
{
- mCompletedTasks.erase(iterator);
+ (*iter).second = true;
}
- CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
- }
-
- if(!mCompletedTasks.empty())
- {
- needCheckUnregisterProcessor = false;
}
}
- // UnregisterProcessor required to lock mutex. Call that API only if required.
- if(needCheckUnregisterProcessor)
- {
- UnregisterProcessor();
- }
- }
-}
-
-AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
-{
- // Lock while popping task out from the queue
- Mutex::ScopedLock lock(mCompletedTasksMutex);
-
- if(mCompletedTasks.empty())
- {
- return AsyncTaskPtr();
- }
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
-
- auto next = mCompletedTasks.begin();
- AsyncTaskPtr nextTask = *next;
- CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
- mCompletedTasks.erase(next);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
-
- return nextTask;
-}
-
-void AsyncTaskManager::UnregisterProcessor()
-{
- if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
- // Keep processor at least 1 task exist.
- // Please be careful the order of mutex, to avoid dead lock.
- // TODO : Should we lock all mutex rightnow?
- Mutex::ScopedLock lock1(mWaitingTasksMutex);
- if(mWaitingTasks.empty())
+ if(!mCompletedTasks.empty())
{
- Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
- if(mRunningTasks.empty())
+ for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
{
- Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
- if(mCompletedTasks.empty())
+ if((*it) && (*it) == task)
+ {
+ it = mCompletedTasks.erase(it);
+ }
+ else
{
- Dali::Adaptor::Get().UnregisterProcessor(*this);
- mProcessorRegistered = false;
+ it++;
}
}
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
- }
-}
-
-void AsyncTaskManager::TasksCompleted()
-{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
- while(AsyncTaskPtr task = PopNextCompletedTask())
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
- CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
UnregisterProcessor();
}
-void AsyncTaskManager::Process(bool postProcessor)
-{
- TasksCompleted();
-}
-
-/// Worker thread called
AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
{
// Lock while popping task out from the queue
- Mutex::ScopedLock lock(mWaitingTasksMutex);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
+ Mutex::ScopedLock lock(mMutex);
// pop out the next task from the queue
AsyncTaskPtr nextTask = nullptr;
- // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
- if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
- {
- // For thread safety
- Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
-
- if(mAvaliableLowPriorityTaskCounts == 0u)
- {
- // There are no avaliabe tasks to run now. Return nullptr.
- return nextTask;
- }
- }
-
for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
{
if((*iter)->IsReady())
{
const auto priorityType = (*iter)->GetPriorityType();
- bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
- if(!taskAvaliable)
- {
- // For thread safety
- Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
-
- taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
- }
+ const bool taskAvaliable = (priorityType == AsyncTask::PriorityType::HIGH) || // Task always valid if it's priority is high
+ (mAvaliableLowPriorityTaskCounts > 0u); // or priority is low, but we can use it.
if(taskAvaliable)
{
nextTask = *iter;
// Add Running queue
- {
- // Lock while popping task out from the queue
- Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
-
- auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
- CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
-
- // Decrease avaliable task counts if it is low priority
- if(priorityType == AsyncTask::PriorityType::LOW)
- {
- // We are under running task mutex. We can decrease it.
- --mAvaliableLowPriorityTaskCounts;
- }
- }
+ mRunningTasks.push_back(std::make_pair(nextTask, false));
+ mWaitingTasks.erase(iter);
- if(priorityType == AsyncTask::PriorityType::HIGH)
+ // Decrease avaliable task counts if it is low priority
+ if(priorityType == AsyncTask::PriorityType::LOW)
{
- // Decrease the number of waiting tasks for high priority.
- --mWaitingHighProirityTaskCounts;
+ --mAvaliableLowPriorityTaskCounts;
}
-
- CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
- mWaitingTasks.erase(iter);
break;
}
}
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
+ return nextTask;
+}
+
+AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
+{
+ // Lock while popping task out from the queue
+ Mutex::ScopedLock lock(mMutex);
+
+ if(mCompletedTasks.empty())
+ {
+ return AsyncTaskPtr();
+ }
+
+ std::vector<AsyncTaskPtr>::iterator next = mCompletedTasks.begin();
+ AsyncTaskPtr nextTask = *next;
+ mCompletedTasks.erase(next);
return nextTask;
}
-/// Worker thread called
void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
{
// Lock while adding task to the queue
{
- Mutex::ScopedLock lock(mRunningTasksMutex);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
+ Mutex::ScopedLock lock(mMutex);
- // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
{
if((*iter).first == task)
{
if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
{
- Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
-
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
- CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
+ mCompletedTasks.push_back(task);
}
-
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
}
+ // Delete this task in running queue
+ mRunningTasks.erase(iter);
+
// Increase avaliable task counts if it is low priority
const auto priorityType = task->GetPriorityType();
if(priorityType == AsyncTask::PriorityType::LOW)
{
- // We are under running task mutex. We can increase it.
++mAvaliableLowPriorityTaskCounts;
}
-
- // Delete this task in running queue
- mRunningTasks.erase(iter);
break;
}
}
// wake up the main thread
if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
mTrigger->Trigger();
}
else
{
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
}
-// AsyncTaskManager::TaskHelper
+void AsyncTaskManager::UnregisterProcessor()
+{
+ if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
+ {
+ Mutex::ScopedLock lock(mMutex);
+ if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
+ {
+ Dali::Adaptor::Get().UnregisterProcessor(*this);
+ mProcessorRegistered = false;
+ }
+ }
+}
+
+void AsyncTaskManager::TasksCompleted()
+{
+ while(AsyncTaskPtr task = PopNextCompletedTask())
+ {
+ CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ }
+
+ UnregisterProcessor();
+}
+
+void AsyncTaskManager::Process(bool postProcessor)
+{
+ TasksCompleted();
+}
AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
: TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)