}
}
-// AsyncTaskManager::TasksCompletedImpl
-
-struct AsyncTaskManager::TasksCompletedImpl
-{
- TasksCompletedImpl(AsyncTaskManager& manager)
- : mManager(manager),
- mEmitCompletedTaskRegistered(false)
- {
- }
-
-public:
- /**
- * @brief Create new tasks completed id and.
- * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
- * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
- */
- Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
- {
- // Lock while adding tasks completed callback list to the queue
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
-
- auto id = mTasksCompletedCount++;
- DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
-
- mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
- return id;
- }
-
- /**
- * @brief Append task that will be trace.
- * @post RemoveTaskTrace should be called.
- * @param[in] id The id of tasks completed.
- * @param[in] task The task want to trace.
- */
- void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p]\n", id, task.Get());
-
- // Lock while adding tasks completed callback list to the queue
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
-
- auto iter = mTasksCompletedCallbackList.find(id);
- if(iter == mTasksCompletedCallbackList.end())
- {
- // This task is already erased. Ignore.
- return;
- }
-
- auto& callbackData = iter->second;
-
- auto jter = callbackData.mTasks.find(task.Get());
-
- if(jter != callbackData.mTasks.end())
- {
- // Increase reference count.
- ++(jter->second);
- }
- else
- {
- callbackData.mTasks.insert({task.Get(), 1u});
- }
- }
-
- /**
- * @brief Remove all task that were traced.
- * @param[in] task The task want to remove trace.
- * @param[in] taskCount The number of tasks that will be removed.
- */
- void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
- {
- if(count == 0u)
- {
- return;
- }
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p] remove count[%u]\n", task.Get(), count);
-
- // Lock while removing tasks completed callback list to the queue
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
-
- for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
- {
- auto& callbackData = iter->second;
- bool eraseCallbackData = false;
-
- auto jter = callbackData.mTasks.find(task.Get());
-
- if(jter != callbackData.mTasks.end())
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p], current refcount[%u]\n", iter->first, task.Get(), (jter->second));
-
- if(jter->second <= count)
- {
- callbackData.mTasks.erase(jter);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
-
- if(callbackData.mTasks.empty())
- {
- eraseCallbackData = true;
-
- // Move callback base into list.
- // (To avoid task container changed during callback emit)
- RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
-
- iter = mTasksCompletedCallbackList.erase(iter);
- }
- }
- else
- {
- jter->second -= count;
- }
- }
-
- if(!eraseCallbackData)
- {
- ++iter;
- }
- }
- }
-
- /**
- * @brief Check whether current TasksCompletedId completed or not.
- * @param[in] id The id of tasks completed.
- * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
- */
- bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
-
- // Lock while removing tasks completed callback list to the queue
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
-
- auto iter = mTasksCompletedCallbackList.find(id);
- if(iter != mTasksCompletedCallbackList.end())
- {
- auto& callbackData = iter->second;
- if(callbackData.mTasks.empty())
- {
- // Move callback base into list.
- // (To avoid task container changed during callback emit)
- RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
-
- iter = mTasksCompletedCallbackList.erase(iter);
-
- return true;
- }
- }
-
- return false;
- }
-
- /**
- * @brief Remove taskS completed callbacks by id.
- * @param[in] id The id of taskS completed.
- * @return True if taskS completed id removed. False otherwise.
- */
- bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
-
- // Lock while removing taskS completed callback list to the queue
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
-
- auto iter = mTasksCompletedCallbackList.find(id);
- if(iter == mTasksCompletedCallbackList.end())
- {
- // This task is already erased, or completed.
- // Erase from completed excute callback list.
-
- // Lock while removing excute callback list to the queue
- Mutex::ScopedLock lock(mExcuteCallbacksMutex);
-
- for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
- {
- if(iter->second == id)
- {
- iter = mExcuteCallbackList.erase(iter);
-
- return true;
- }
- else
- {
- ++iter;
- }
- }
-
- // This task is alread erased and completed. Ignore.
- return false;
- }
-
- mTasksCompletedCallbackList.erase(iter);
-
- return true;
- }
-
- /**
- * @brief Emit all completed callbacks.
- */
- void EmitCompletedTasks()
- {
- ExecuteCallbackContainer executeCallbackList;
- {
- // Lock while removing excute callback list to the queue
- Mutex::ScopedLock lock(mExcuteCallbacksMutex);
-
- mEmitCompletedTaskRegistered = false;
-
- // Copy callback lists, for let we execute callbacks out of mutex
- executeCallbackList = std::move(mExcuteCallbackList);
- mExcuteCallbackList.clear();
- }
-
- if(!executeCallbackList.empty())
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
- // Execute all callbacks
- for(auto&& callbackPair : executeCallbackList)
- {
- auto& callback = callbackPair.first;
- auto id = callbackPair.second;
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
-
- Dali::CallbackBase::Execute(*callback, id);
- }
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
- }
- }
-
- /**
- * @brief Check whether there is some completed signal what we need to trace, or not.
- * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
- */
- bool IsTasksCompletedCallbackExist()
- {
- Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
- return !mTasksCompletedCallbackList.empty();
- }
-
- /**
- * @brief Check whether there is some completed signal what we need to execute, or not.
- * @return True if mExcuteCallbackList is not empty. False otherwise.
- */
- bool IsExecuteCallbackExist()
- {
- Mutex::ScopedLock lock(mExcuteCallbacksMutex);
- return !mExcuteCallbackList.empty();
- }
-
-private:
- void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
-
- // Lock while adding excute callback list to the queue
- Mutex::ScopedLock lock(mExcuteCallbacksMutex);
-
- mExcuteCallbackList.emplace_back(std::move(callback), id);
-
- if(!mEmitCompletedTaskRegistered)
- {
- mEmitCompletedTaskRegistered = true;
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Register processor\n");
- mManager.RegisterProcessor();
- }
- }
-
-private:
- struct CallbackData
- {
- public:
- CallbackData(CallbackBase* callback)
- : mCallback(callback),
- mTasks()
- {
- }
-
- CallbackData(CallbackData&& rhs) noexcept
- : mCallback(std::move(rhs.mCallback)),
- mTasks(std::move(rhs.mTasks))
- {
- }
-
- CallbackData& operator=(CallbackData&& rhs) noexcept
- {
- if(this != &rhs)
- {
- mCallback = std::move(rhs.mCallback);
- mTasks = std::move(rhs.mTasks);
- }
-
- return *this;
- }
-
- private:
- // Delete copy operator.
- CallbackData(const CallbackData& rhs) = delete;
- CallbackData& operator=(const CallbackData& rhs) = delete;
-
- public:
- std::unique_ptr<CallbackBase> mCallback;
- std::unordered_map<const AsyncTask*, uint32_t> mTasks;
- };
-
-private:
- AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
-
- Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
-
- using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
- TasksCompletedContainer mTasksCompletedCallbackList;
-
- using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
- ExecuteCallbackContainer mExcuteCallbackList;
-
- Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
- Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
-
- bool mEmitCompletedTaskRegistered : 1;
-};
-
// AsyncTaskManager::CacheImpl
struct AsyncTaskManager::CacheImpl
: mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
mWaitingHighProirityTaskCounts(0u),
- mTasksCompletedImpl(new TasksCompletedImpl(*this)),
mCacheImpl(new CacheImpl(*this)),
mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
mProcessorRegistered(false)
}
// Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
- RegisterProcessor();
+ if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
+ {
+ Dali::Adaptor::Get().RegisterProcessor(*this);
+ mProcessorRegistered = true;
+ }
return;
}
// If there is some non-empty queue exist, we don't need to unregister processor.
bool needCheckUnregisterProcessor = true;
- uint32_t removedCount = 0u;
-
{
// Lock while remove task from the queue
Mutex::ScopedLock lock(mWaitingTasksMutex);
--mWaitingHighProirityTaskCounts;
}
mWaitingTasks.erase(iterator);
- ++removedCount;
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
}
// We cannot erase container. Just mark as canceled.
// Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
(*iterator).second = RunningTaskState::CANCELED;
- ++removedCount;
}
}
{
DALI_ASSERT_DEBUG(iterator->first == task);
mCompletedTasks.erase(iterator);
- ++removedCount;
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
}
}
}
- // Remove TasksCompleted callback trace
- if(removedCount > 0u)
- {
- mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
- }
-
// UnregisterProcessor required to lock mutex. Call this API only if required.
if(needCheckUnregisterProcessor)
{
}
}
-Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
-{
- // mTasksCompletedImpl will take ownership of callback.
- Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
-
- bool taskAdded = false; ///< Flag whether at least one task tracing now.
-
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
-
- // Please be careful the order of mutex, to avoid dead lock.
- {
- Mutex::ScopedLock lockWait(mWaitingTasksMutex);
- {
- Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
- {
- Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
-
- // Collect all tasks from waiting tasks
- for(auto& task : mWaitingTasks)
- {
- auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
- (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
-
- if((checkMask & mask) == checkMask)
- {
- taskAdded = true;
- mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
- }
- }
-
- // Collect all tasks from running tasks
- for(auto& taskPair : mRunningTasks)
- {
- auto& task = taskPair.first;
- auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
- (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
-
- if((checkMask & mask) == checkMask)
- {
- taskAdded = true;
- mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
- }
- }
-
- // Collect all tasks from complete tasks
- for(auto& taskPair : mCompletedTasks)
- {
- auto& task = taskPair.first;
- auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
- (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
-
- if((checkMask & mask) == checkMask)
- {
- taskAdded = true;
- mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
- }
- }
- }
- }
- }
-
- // If there is nothing to check task, just excute callback right now.
- if(!taskAdded)
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
-
- mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
- }
- return tasksCompletedId;
-}
-
-bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
-{
- return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
-}
-
AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
{
std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
return nextCompletedTask;
}
-void AsyncTaskManager::RegisterProcessor()
-{
- if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
- {
- Dali::Adaptor::Get().RegisterProcessor(*this);
- mProcessorRegistered = true;
- }
-}
-
void AsyncTaskManager::UnregisterProcessor()
{
if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
-
- // Remove TasksCompleted callback trace
- mTasksCompletedImpl->RemoveTaskTrace(task);
}
UnregisterProcessor();
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
-
- mTasksCompletedImpl->EmitCompletedTasks();
}
void AsyncTaskManager::Process(bool postProcessor)
{
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
-
- // We need to remove task trace now.
- if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
- {
- mTasksCompletedImpl->RemoveTaskTrace(task);
-
- if(mTasksCompletedImpl->IsExecuteCallbackExist())
- {
- // We need to call EmitCompletedTasks(). Trigger main thread.
- needTrigger = true;
- }
- }
}
// Lock while adding task to the queue