2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
28 #include <unordered_map>
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
49 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
58 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
67 uint32_t gThreadId = 0u; // Only for debug
70 } // unnamed namespace
74 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
76 mAsyncTaskManager(asyncTaskManager),
77 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
78 mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
79 mDestroyThread(false),
80 mIsThreadStarted(false),
85 AsyncTaskThread::~AsyncTaskThread()
89 ConditionalWait::ScopedLock lock(mConditionalWait);
90 mDestroyThread = true;
91 mConditionalWait.Notify(lock);
97 bool AsyncTaskThread::Request()
102 mIsThreadStarted = true;
106 // Lock while adding task to the queue
107 ConditionalWait::ScopedLock lock(mConditionalWait);
111 mIsThreadIdle = false;
113 // wake up the thread
114 mConditionalWait.Notify(lock);
122 void AsyncTaskThread::Run()
124 #if defined(DEBUG_ENABLED)
125 uint32_t threadId = gThreadId++;
128 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
132 SetThreadName("AsyncTaskThread");
134 mLogFactory.InstallLogFunction();
135 mTraceFactory.InstallTraceFunction();
137 while(!mDestroyThread)
139 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
142 ConditionalWait::ScopedLock lock(mConditionalWait);
145 mIsThreadIdle = true;
146 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
147 mConditionalWait.Wait(lock);
148 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
153 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
155 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
158 mAsyncTaskManager.CompleteTask(std::move(task));
164 // AsyncTaskManager::TasksCompletedImpl
166 struct AsyncTaskManager::TasksCompletedImpl
168 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
171 mEmitCompletedTaskTriggered(false)
177 * @brief Create new tasks completed id and.
178 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
179 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
181 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
183 // Lock while adding tasks completed callback list to the queue
184 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
186 auto id = mTasksCompletedCount++;
187 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
189 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
191 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
196 * @brief Append task that will be trace.
197 * @post RemoveTaskTrace should be called.
198 * @param[in] id The id of tasks completed.
199 * @param[in] task The task want to trace.
201 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
203 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p]\n", id, task.Get());
205 // Lock while adding tasks completed callback list to the queue
206 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
208 auto iter = mTasksCompletedCallbackList.find(id);
209 if(iter == mTasksCompletedCallbackList.end())
211 // This task is already erased. Ignore.
215 auto& callbackData = iter->second;
217 auto jter = callbackData.mTasks.find(task.Get());
219 if(jter != callbackData.mTasks.end())
221 // Increase reference count.
226 callbackData.mTasks.insert({task.Get(), 1u});
231 * @brief Remove all task that were traced.
232 * @param[in] task The task want to remove trace.
233 * @param[in] taskCount The number of tasks that will be removed.
235 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
241 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p] remove count[%u]\n", task.Get(), count);
243 // Lock while removing tasks completed callback list to the queue
244 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
246 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
248 auto& callbackData = iter->second;
249 bool eraseCallbackData = false;
251 auto jter = callbackData.mTasks.find(task.Get());
253 if(jter != callbackData.mTasks.end())
255 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p], current refcount[%u]\n", iter->first, task.Get(), (jter->second));
257 if(jter->second <= count)
259 callbackData.mTasks.erase(jter);
261 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
263 if(callbackData.mTasks.empty())
265 eraseCallbackData = true;
267 // Move callback base into list.
268 // (To avoid task container changed during callback emit)
269 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
271 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
273 iter = mTasksCompletedCallbackList.erase(iter);
278 jter->second -= count;
282 if(!eraseCallbackData)
290 * @brief Check whether current TasksCompletedId completed or not.
291 * @param[in] id The id of tasks completed.
292 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
294 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
296 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
298 // Lock while removing tasks completed callback list to the queue
299 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
301 auto iter = mTasksCompletedCallbackList.find(id);
302 if(iter != mTasksCompletedCallbackList.end())
304 auto& callbackData = iter->second;
305 if(callbackData.mTasks.empty())
307 // Move callback base into list.
308 // (To avoid task container changed during callback emit)
309 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
311 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
313 iter = mTasksCompletedCallbackList.erase(iter);
323 * @brief Remove taskS completed callbacks by id.
324 * @param[in] id The id of taskS completed.
325 * @return True if taskS completed id removed. False otherwise.
327 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
329 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
331 // Lock while removing taskS completed callback list to the queue
332 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
334 auto iter = mTasksCompletedCallbackList.find(id);
335 if(iter == mTasksCompletedCallbackList.end())
337 // This task is already erased, or completed.
338 // Erase from completed excute callback list.
340 // Lock while removing excute callback list to the queue
341 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
343 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
345 if(iter->second == id)
347 iter = mExcuteCallbackList.erase(iter);
357 // This task is alread erased and completed. Ignore.
361 mTasksCompletedCallbackList.erase(iter);
367 * @brief Emit all completed callbacks.
368 * @note This API should be called at event thread.
370 void EmitCompletedTasks()
372 ExecuteCallbackContainer executeCallbackList;
374 // Lock while removing excute callback list to the queue
375 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
377 mEmitCompletedTaskTriggered = false;
379 // Copy callback lists, for let we execute callbacks out of mutex
380 executeCallbackList = std::move(mExcuteCallbackList);
381 mExcuteCallbackList.clear();
384 if(!executeCallbackList.empty())
386 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
387 // Execute all callbacks
388 for(auto&& callbackPair : executeCallbackList)
390 auto& callback = callbackPair.first;
391 auto id = callbackPair.second;
393 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
395 Dali::CallbackBase::Execute(*callback, id);
398 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
403 * @brief Check whether there is some completed signal what we need to trace, or not.
404 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
406 bool IsTasksCompletedCallbackExist()
408 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
409 return !mTasksCompletedCallbackList.empty();
413 * @brief Check whether there is some completed signal what we need to execute, or not.
414 * @return True if mExcuteCallbackList is not empty. False otherwise.
416 bool IsExecuteCallbackExist()
418 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
419 return !mExcuteCallbackList.empty();
423 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
425 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
427 // Lock while adding excute callback list to the queue
428 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
430 mExcuteCallbackList.emplace_back(std::move(callback), id);
432 if(!mEmitCompletedTaskTriggered)
434 mEmitCompletedTaskTriggered = true;
436 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
445 CallbackData(CallbackBase* callback)
446 : mCallback(callback),
451 CallbackData(CallbackData&& rhs) noexcept
452 : mCallback(std::move(rhs.mCallback)),
453 mTasks(std::move(rhs.mTasks))
457 CallbackData& operator=(CallbackData&& rhs) noexcept
461 mCallback = std::move(rhs.mCallback);
462 mTasks = std::move(rhs.mTasks);
469 // Delete copy operator.
470 CallbackData(const CallbackData& rhs) = delete;
471 CallbackData& operator=(const CallbackData& rhs) = delete;
474 std::unique_ptr<CallbackBase> mCallback;
475 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
479 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
480 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
482 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
484 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
485 TasksCompletedContainer mTasksCompletedCallbackList;
487 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
488 ExecuteCallbackContainer mExcuteCallbackList;
490 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
491 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
493 bool mEmitCompletedTaskTriggered : 1;
496 // AsyncTaskManager::CacheImpl
498 struct AsyncTaskManager::CacheImpl
500 CacheImpl(AsyncTaskManager& manager)
506 // Insert / Erase task cache API.
509 * @brief Insert cache that input task.
510 * @pre Mutex be locked.
512 template<typename CacheContainer, typename Iterator>
513 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
515 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
516 cacheContainer.insert(cacheContainer.end(), iterator);
520 * @brief Erase cache that input task.
521 * @pre Mutex be locked.
523 template<typename CacheContainer, typename Iterator>
524 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
526 auto mapIter = cacheMap.find(task.Get());
527 if(mapIter != cacheMap.end())
529 auto& cacheContainer = (*mapIter).second;
530 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
532 if(cacheIter != cacheContainer.end())
534 cacheContainer.erase(cacheIter);
535 if(cacheContainer.empty())
537 cacheMap.erase(mapIter);
544 * @brief Erase all cache that input task.
545 * @pre Mutex be locked.
547 template<typename CacheContainer>
548 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
550 auto mapIter = cacheMap.find(task.Get());
551 if(mapIter != cacheMap.end())
553 cacheMap.erase(mapIter);
558 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
560 // Keep cache iterators as list since we take tasks by FIFO as default.
561 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
562 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
563 using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
565 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
566 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
567 CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
572 Dali::AsyncTaskManager AsyncTaskManager::Get()
574 Dali::AsyncTaskManager manager;
575 SingletonService singletonService(SingletonService::Get());
578 // Check whether the async task manager is already created
579 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
582 // If so, downcast the handle of singleton
583 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
588 // If not, create the async task manager and register it as a singleton
589 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
590 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
591 singletonService.Register(typeid(manager), manager);
597 AsyncTaskManager::AsyncTaskManager()
598 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
599 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
600 mWaitingHighProirityTaskCounts(0u),
601 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
602 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
603 mCacheImpl(new CacheImpl(*this)),
604 mProcessorRegistered(false)
608 AsyncTaskManager::~AsyncTaskManager()
610 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
612 mProcessorRegistered = false;
613 Dali::Adaptor::Get().UnregisterProcessor(*this);
619 // Remove task completed impl and cache impl after all threads are join.
620 mTasksCompletedImpl.reset();
623 // Remove tasks after CacheImpl removed
624 mWaitingTasks.clear();
625 mRunningTasks.clear();
626 mCompletedTasks.clear();
629 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
633 // Lock while adding task to the queue
634 Mutex::ScopedLock lock(mWaitingTasksMutex);
636 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
638 // push back into waiting queue.
639 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
640 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
642 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
644 // Increase the number of waiting tasks for high priority.
645 ++mWaitingHighProirityTaskCounts;
650 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
652 // Finish all Running threads are working
653 if(mRunningTasks.size() >= mTasks.GetElementCount())
660 size_t count = mTasks.GetElementCount();
662 while(index++ < count)
664 auto processHelperIt = mTasks.GetNext();
665 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
666 if(processHelperIt->Request())
670 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
673 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
679 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
683 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
685 // Check whether we need to unregister processor.
686 // If there is some non-empty queue exist, we don't need to unregister processor.
687 bool needCheckUnregisterProcessor = true;
689 uint32_t removedCount = 0u;
692 // Lock while remove task from the queue
693 Mutex::ScopedLock lock(mWaitingTasksMutex);
695 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
696 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
698 for(auto& iterator : mapIter->second)
700 DALI_ASSERT_DEBUG((*iterator) == task);
701 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
703 // Decrease the number of waiting tasks for high priority.
704 --mWaitingHighProirityTaskCounts;
706 mWaitingTasks.erase(iterator);
709 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
712 if(!mWaitingTasks.empty())
714 needCheckUnregisterProcessor = false;
719 // Lock while remove task from the queue
720 Mutex::ScopedLock lock(mRunningTasksMutex);
722 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
723 if(mapIter != mCacheImpl->mRunningTasksCache.end())
725 for(auto& iterator : mapIter->second)
727 DALI_ASSERT_DEBUG((*iterator).first == task);
728 // We cannot erase container. Just mark as canceled.
729 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
730 if((*iterator).second == RunningTaskState::RUNNING)
732 (*iterator).second = RunningTaskState::CANCELED;
738 if(!mRunningTasks.empty())
740 needCheckUnregisterProcessor = false;
745 // Lock while remove task from the queue
746 Mutex::ScopedLock lock(mCompletedTasksMutex);
748 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
749 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
751 for(auto& iterator : mapIter->second)
753 DALI_ASSERT_DEBUG((*iterator).first == task);
754 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
758 mCompletedTasks.erase(iterator);
760 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
763 if(!mCompletedTasks.empty())
765 needCheckUnregisterProcessor = false;
769 // Remove TasksCompleted callback trace
770 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
772 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
775 // UnregisterProcessor required to lock mutex. Call this API only if required.
776 if(needCheckUnregisterProcessor)
778 UnregisterProcessor();
783 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
785 // mTasksCompletedImpl will take ownership of callback.
786 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
788 bool taskAdded = false; ///< Flag whether at least one task tracing now.
790 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
792 // Please be careful the order of mutex, to avoid dead lock.
794 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
796 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
798 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
800 // Collect all tasks from waiting tasks
801 for(auto& task : mWaitingTasks)
803 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
804 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
806 if((checkMask & mask) == checkMask)
809 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
813 // Collect all tasks from running tasks
814 for(auto& taskPair : mRunningTasks)
816 // Trace only if it is running now.
817 if(taskPair.second == RunningTaskState::RUNNING)
819 auto& task = taskPair.first;
820 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
821 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
823 if((checkMask & mask) == checkMask)
826 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
831 // Collect all tasks from complete tasks
832 for(auto& taskPair : mCompletedTasks)
834 // Trace only if it is need callback.
835 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
836 // If worker thread invocation, than it already remove trace at completed timing.
837 // If canceled cases, we don't append trace at running tasks already.
838 // So, we don't need to trace for SKIP_CALLBACK cases.
839 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
841 auto& task = taskPair.first;
842 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
843 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
845 if((checkMask & mask) == checkMask)
848 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
856 // If there is nothing to check task, just excute callback right now.
859 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
861 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
863 return tasksCompletedId;
866 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
868 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
871 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
873 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
875 AsyncTaskPtr nextCompletedTask = nullptr;
877 // Lock while popping task out from the queue
878 Mutex::ScopedLock lock(mCompletedTasksMutex);
880 while(!mCompletedTasks.empty())
882 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
884 auto next = mCompletedTasks.begin();
885 AsyncTaskPtr nextTask = next->first;
886 CompletedTaskState taskState = next->second;
887 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
888 mCompletedTasks.erase(next);
890 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
892 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
894 nextCompletedTask = nextTask;
898 ignoredTaskList.push_back(nextTask);
901 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
904 return nextCompletedTask;
907 void AsyncTaskManager::RegisterProcessor()
909 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
911 Dali::Adaptor::Get().RegisterProcessor(*this);
912 mProcessorRegistered = true;
916 void AsyncTaskManager::UnregisterProcessor()
918 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
920 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
921 // Keep processor at least 1 task exist.
922 // Please be careful the order of mutex, to avoid dead lock.
923 // TODO : Should we lock all mutex rightnow?
924 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
925 if(mWaitingTasks.empty())
927 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
928 if(mRunningTasks.empty())
930 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
931 if(mCompletedTasks.empty())
933 mProcessorRegistered = false;
934 Dali::Adaptor::Get().UnregisterProcessor(*this);
938 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
942 void AsyncTaskManager::TasksCompleted()
944 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
945 while(AsyncTaskPtr task = PopNextCompletedTask())
947 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
948 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
950 // Remove TasksCompleted callback trace
951 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
953 mTasksCompletedImpl->RemoveTaskTrace(task);
957 UnregisterProcessor();
958 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
960 mTasksCompletedImpl->EmitCompletedTasks();
963 void AsyncTaskManager::Process(bool postProcessor)
968 /// Worker thread called
969 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
971 // Lock while popping task out from the queue
972 Mutex::ScopedLock lock(mWaitingTasksMutex);
974 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
976 // pop out the next task from the queue
977 AsyncTaskPtr nextTask = nullptr;
979 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
980 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
983 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
985 if(mAvaliableLowPriorityTaskCounts == 0u)
987 // There are no avaliabe tasks to run now. Return nullptr.
992 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
994 if((*iter)->IsReady())
996 const auto priorityType = (*iter)->GetPriorityType();
997 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
1000 // For thread safety
1001 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1003 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
1010 // Add Running queue
1012 // Lock while popping task out from the queue
1013 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1015 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
1017 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1018 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1020 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1021 mWaitingTasks.erase(iter);
1023 // Decrease avaliable task counts if it is low priority
1024 if(priorityType == AsyncTask::PriorityType::LOW)
1026 // We are under running task mutex. We can decrease it.
1027 --mAvaliableLowPriorityTaskCounts;
1031 if(priorityType == AsyncTask::PriorityType::HIGH)
1033 // Decrease the number of waiting tasks for high priority.
1034 --mWaitingHighProirityTaskCounts;
1041 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
1046 /// Worker thread called
1047 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1049 bool notify = false;
1053 bool needTrigger = false;
1055 // Lock while check validation of task.
1057 Mutex::ScopedLock lock(mRunningTasksMutex);
1059 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1060 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1062 const auto cacheIter = mapIter->second.begin();
1063 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1065 const auto iter = *cacheIter;
1066 DALI_ASSERT_DEBUG(iter->first == task);
1067 if(iter->second == RunningTaskState::RUNNING)
1069 // This task is valid.
1074 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
1077 // We should execute this tasks complete callback out of mutex
1078 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1080 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
1081 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1083 // We need to remove task trace now.
1084 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1086 mTasksCompletedImpl->RemoveTaskTrace(task);
1088 if(mTasksCompletedImpl->IsExecuteCallbackExist())
1090 // We need to call EmitCompletedTasks(). Trigger main thread.
1096 // Lock while adding task to the queue
1098 Mutex::ScopedLock lock(mRunningTasksMutex);
1100 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1101 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1103 const auto cacheIter = mapIter->second.begin();
1104 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1106 const auto iter = *cacheIter;
1107 const auto priorityType = iter->first->GetPriorityType();
1108 // Increase avaliable task counts if it is low priority
1109 if(priorityType == AsyncTask::PriorityType::LOW)
1111 // We are under running task mutex. We can increase it.
1112 ++mAvaliableLowPriorityTaskCounts;
1115 // Move task into completed, for ensure that AsyncTask destroy at main thread.
1117 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1119 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1121 needTrigger |= callbackRequired;
1123 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
1125 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1126 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1128 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1129 mRunningTasks.erase(iter);
1133 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1136 // Now, task is invalidate.
1142 // Wake up the main thread
1145 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1146 mTrigger->Trigger();
1151 // AsyncTaskManager::TaskHelper
1153 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1154 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1158 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1159 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1163 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1164 : mProcessor(std::move(processor)),
1165 mAsyncTaskManager(asyncTaskManager)
1169 bool AsyncTaskManager::TaskHelper::Request()
1171 return mProcessor->Request();
1173 } // namespace Adaptor
1175 } // namespace Internal