2 * Copyright (c) 2024 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
29 #include <dali/internal/system/common/environment-variables.h>
31 #include <unordered_map>
41 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
43 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
45 // The number of threads for low priority task.
46 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
48 size_t GetNumberOfThreads(size_t defaultValue)
50 auto numberString = EnvironmentVariable::GetEnvironmentVariable(DALI_ENV_ASYNC_MANAGER_THREAD_POOL_SIZE);
51 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
52 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
53 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
54 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
57 size_t GetNumberOfLowPriorityThreads(size_t defaultValue, size_t maxValue)
59 auto numberString = EnvironmentVariable::GetEnvironmentVariable(DALI_ENV_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE);
60 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
61 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
62 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
65 #if defined(DEBUG_ENABLED)
66 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
68 uint32_t gThreadId = 0u; // Only for debug
72 * @brief Get the Task Name.
73 * Note that we can get const char* from std::string_view as data() since it will be const class.
75 * @param task The task what we want to get the name.
76 * @return The name of task, or (nil) if task is invalid.
78 const char* GetTaskName(AsyncTaskPtr task)
81 return task ? task->GetTaskName().data() : "(nil)";
84 } // unnamed namespace
88 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
90 mAsyncTaskManager(asyncTaskManager),
91 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
92 mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
93 mDestroyThread(false),
94 mIsThreadStarted(false),
99 AsyncTaskThread::~AsyncTaskThread()
103 ConditionalWait::ScopedLock lock(mConditionalWait);
104 mDestroyThread = true;
105 mConditionalWait.Notify(lock);
111 bool AsyncTaskThread::Request()
113 if(!mIsThreadStarted)
116 mIsThreadStarted = true;
120 // Lock while adding task to the queue
121 ConditionalWait::ScopedLock lock(mConditionalWait);
125 mIsThreadIdle = false;
127 // wake up the thread
128 mConditionalWait.Notify(lock);
136 void AsyncTaskThread::Run()
138 #if defined(DEBUG_ENABLED)
139 uint32_t threadId = gThreadId++;
142 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
146 SetThreadName("AsyncTaskThread");
148 mLogFactory.InstallLogFunction();
149 mTraceFactory.InstallTraceFunction();
151 while(!mDestroyThread)
153 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
156 ConditionalWait::ScopedLock lock(mConditionalWait);
159 mIsThreadIdle = true;
160 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
161 mConditionalWait.Wait(lock);
162 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
167 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
169 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
172 mAsyncTaskManager.CompleteTask(std::move(task));
178 // AsyncTaskManager::TasksCompletedImpl
180 struct AsyncTaskManager::TasksCompletedImpl
182 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
185 mEmitCompletedTaskTriggered(false)
191 * @brief Create new tasks completed id and.
192 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
193 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
195 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
197 // Lock while adding tasks completed callback list to the queue
198 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
200 auto id = mTasksCompletedCount++;
201 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
203 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
205 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
210 * @brief Append task that will be trace.
211 * @post RemoveTaskTrace should be called.
212 * @param[in] id The id of tasks completed.
213 * @param[in] task The task want to trace.
215 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
217 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p][%s]\n", id, task.Get(), GetTaskName(task));
219 // Lock while adding tasks completed callback list to the queue
220 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
222 auto iter = mTasksCompletedCallbackList.find(id);
223 if(iter == mTasksCompletedCallbackList.end())
225 // This task is already erased. Ignore.
229 auto& callbackData = iter->second;
231 auto jter = callbackData.mTasks.find(task.Get());
233 if(jter != callbackData.mTasks.end())
235 // Increase reference count.
240 callbackData.mTasks.insert({task.Get(), 1u});
245 * @brief Remove all task that were traced.
246 * @param[in] task The task want to remove trace.
247 * @param[in] taskCount The number of tasks that will be removed.
249 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
255 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p][%s] remove count[%u]\n", task.Get(), GetTaskName(task), count);
257 // Lock while removing tasks completed callback list to the queue
258 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
260 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
262 auto& callbackData = iter->second;
263 bool eraseCallbackData = false;
265 auto jter = callbackData.mTasks.find(task.Get());
267 if(jter != callbackData.mTasks.end())
269 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p][%s], current refcount[%u]\n", iter->first, task.Get(), GetTaskName(task), (jter->second));
271 if(jter->second <= count)
273 callbackData.mTasks.erase(jter);
275 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
277 if(callbackData.mTasks.empty())
279 eraseCallbackData = true;
281 // Move callback base into list.
282 // (To avoid task container changed during callback emit)
283 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
285 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
287 iter = mTasksCompletedCallbackList.erase(iter);
292 jter->second -= count;
296 if(!eraseCallbackData)
304 * @brief Check whether current TasksCompletedId completed or not.
305 * @param[in] id The id of tasks completed.
306 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
308 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
310 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
312 // Lock while removing tasks completed callback list to the queue
313 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
315 auto iter = mTasksCompletedCallbackList.find(id);
316 if(iter != mTasksCompletedCallbackList.end())
318 auto& callbackData = iter->second;
319 if(callbackData.mTasks.empty())
321 // Move callback base into list.
322 // (To avoid task container changed during callback emit)
323 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
325 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
327 iter = mTasksCompletedCallbackList.erase(iter);
337 * @brief Remove taskS completed callbacks by id.
338 * @param[in] id The id of taskS completed.
339 * @return True if taskS completed id removed. False otherwise.
341 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
343 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
345 // Lock while removing taskS completed callback list to the queue
346 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
348 auto iter = mTasksCompletedCallbackList.find(id);
349 if(iter == mTasksCompletedCallbackList.end())
351 // This task is already erased, or completed.
352 // Erase from completed excute callback list.
354 // Lock while removing excute callback list to the queue
355 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
357 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
359 if(iter->second == id)
361 iter = mExcuteCallbackList.erase(iter);
371 // This task is alread erased and completed. Ignore.
375 mTasksCompletedCallbackList.erase(iter);
381 * @brief Emit all completed callbacks.
382 * @note This API should be called at event thread.
384 void EmitCompletedTasks()
386 ExecuteCallbackContainer executeCallbackList;
388 // Lock while removing excute callback list to the queue
389 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
391 mEmitCompletedTaskTriggered = false;
393 // Copy callback lists, for let we execute callbacks out of mutex
394 executeCallbackList = std::move(mExcuteCallbackList);
395 mExcuteCallbackList.clear();
398 if(!executeCallbackList.empty())
400 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
401 // Execute all callbacks
402 for(auto&& callbackPair : executeCallbackList)
404 auto& callback = callbackPair.first;
405 auto id = callbackPair.second;
407 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
409 Dali::CallbackBase::Execute(*callback, id);
412 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
417 * @brief Check whether there is some completed signal what we need to trace, or not.
418 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
420 bool IsTasksCompletedCallbackExist()
422 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
423 return !mTasksCompletedCallbackList.empty();
427 * @brief Check whether there is some completed signal what we need to execute, or not.
428 * @return True if mExcuteCallbackList is not empty. False otherwise.
430 bool IsExecuteCallbackExist()
432 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
433 return !mExcuteCallbackList.empty();
437 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
439 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
441 // Lock while adding excute callback list to the queue
442 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
444 mExcuteCallbackList.emplace_back(std::move(callback), id);
446 if(!mEmitCompletedTaskTriggered)
448 mEmitCompletedTaskTriggered = true;
450 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
459 CallbackData(CallbackBase* callback)
460 : mCallback(callback),
465 CallbackData(CallbackData&& rhs) noexcept
466 : mCallback(std::move(rhs.mCallback)),
467 mTasks(std::move(rhs.mTasks))
471 CallbackData& operator=(CallbackData&& rhs) noexcept
475 mCallback = std::move(rhs.mCallback);
476 mTasks = std::move(rhs.mTasks);
483 // Delete copy operator.
484 CallbackData(const CallbackData& rhs) = delete;
485 CallbackData& operator=(const CallbackData& rhs) = delete;
488 std::unique_ptr<CallbackBase> mCallback;
489 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
493 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
494 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
496 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
498 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
499 TasksCompletedContainer mTasksCompletedCallbackList;
501 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
502 ExecuteCallbackContainer mExcuteCallbackList;
504 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
505 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
507 bool mEmitCompletedTaskTriggered : 1;
510 // AsyncTaskManager::CacheImpl
512 struct AsyncTaskManager::CacheImpl
514 CacheImpl(AsyncTaskManager& manager)
520 // Insert / Erase task cache API.
523 * @brief Insert cache that input task.
524 * @pre Mutex be locked.
526 template<typename CacheContainer, typename Iterator>
527 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
529 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
530 cacheContainer.insert(cacheContainer.end(), iterator);
534 * @brief Erase cache that input task.
535 * @pre Mutex be locked.
537 template<typename CacheContainer, typename Iterator>
538 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
540 auto mapIter = cacheMap.find(task.Get());
541 if(mapIter != cacheMap.end())
543 auto& cacheContainer = (*mapIter).second;
544 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
546 if(cacheIter != cacheContainer.end())
548 cacheContainer.erase(cacheIter);
549 if(cacheContainer.empty())
551 cacheMap.erase(mapIter);
558 * @brief Erase all cache that input task.
559 * @pre Mutex be locked.
561 template<typename CacheContainer>
562 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
564 auto mapIter = cacheMap.find(task.Get());
565 if(mapIter != cacheMap.end())
567 cacheMap.erase(mapIter);
572 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
574 // Keep cache iterators as list since we take tasks by FIFO as default.
575 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
576 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
577 using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
579 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
580 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
581 CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
586 Dali::AsyncTaskManager AsyncTaskManager::Get()
588 Dali::AsyncTaskManager manager;
589 SingletonService singletonService(SingletonService::Get());
592 // Check whether the async task manager is already created
593 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
596 // If so, downcast the handle of singleton
597 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
602 // If not, create the async task manager and register it as a singleton
603 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
604 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
605 singletonService.Register(typeid(manager), manager);
611 AsyncTaskManager::AsyncTaskManager()
612 : mTasks(GetNumberOfThreads(DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
613 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
614 mWaitingHighProirityTaskCounts(0u),
615 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
616 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
617 mCacheImpl(new CacheImpl(*this)),
618 mProcessorRegistered(false)
622 AsyncTaskManager::~AsyncTaskManager()
624 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
626 mProcessorRegistered = false;
627 Dali::Adaptor::Get().UnregisterProcessor(*this);
633 // Remove task completed impl and cache impl after all threads are join.
634 mTasksCompletedImpl.reset();
637 // Remove tasks after CacheImpl removed
638 mWaitingTasks.clear();
639 mRunningTasks.clear();
640 mCompletedTasks.clear();
643 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
647 // Lock while adding task to the queue
648 Mutex::ScopedLock lock(mWaitingTasksMutex);
650 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p][%s]\n", task.Get(), GetTaskName(task));
652 // push back into waiting queue.
653 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
654 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
656 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
658 // Increase the number of waiting tasks for high priority.
659 ++mWaitingHighProirityTaskCounts;
664 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
666 // Finish all Running threads are working
667 if(mRunningTasks.size() >= mTasks.GetElementCount())
675 Mutex::ScopedLock lock(mTasksMutex);
676 size_t count = mTasks.GetElementCount();
678 while(index++ < count)
680 auto processHelperIt = mTasks.GetNext();
681 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
682 if(processHelperIt->Request())
686 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
690 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
696 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
700 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p][%s]\n", task.Get(), GetTaskName(task));
702 // Check whether we need to unregister processor.
703 // If there is some non-empty queue exist, we don't need to unregister processor.
704 bool needCheckUnregisterProcessor = true;
706 uint32_t removedCount = 0u;
709 // Lock while remove task from the queue
710 Mutex::ScopedLock lock(mWaitingTasksMutex);
712 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
713 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
715 for(auto& iterator : mapIter->second)
717 DALI_ASSERT_DEBUG((*iterator) == task);
718 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH && mWaitingHighProirityTaskCounts > 0u)
720 // Decrease the number of waiting tasks for high priority.
721 --mWaitingHighProirityTaskCounts;
723 mWaitingTasks.erase(iterator);
726 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
729 if(!mWaitingTasks.empty())
731 needCheckUnregisterProcessor = false;
736 // Lock while remove task from the queue
737 Mutex::ScopedLock lock(mRunningTasksMutex);
739 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
740 if(mapIter != mCacheImpl->mRunningTasksCache.end())
742 for(auto& iterator : mapIter->second)
744 DALI_ASSERT_DEBUG((*iterator).first == task);
745 // We cannot erase container. Just mark as canceled.
746 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
747 if((*iterator).second == RunningTaskState::RUNNING)
749 (*iterator).second = RunningTaskState::CANCELED;
755 if(!mRunningTasks.empty())
757 needCheckUnregisterProcessor = false;
762 // Lock while remove task from the queue
763 Mutex::ScopedLock lock(mCompletedTasksMutex);
765 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
766 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
768 for(auto& iterator : mapIter->second)
770 DALI_ASSERT_DEBUG((*iterator).first == task);
771 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
775 mCompletedTasks.erase(iterator);
777 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
780 if(!mCompletedTasks.empty())
782 needCheckUnregisterProcessor = false;
786 // Remove TasksCompleted callback trace
787 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
789 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
792 // UnregisterProcessor required to lock mutex. Call this API only if required.
793 if(needCheckUnregisterProcessor)
795 UnregisterProcessor();
800 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
802 // mTasksCompletedImpl will take ownership of callback.
803 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
805 bool taskAdded = false; ///< Flag whether at least one task tracing now.
807 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
809 // Please be careful the order of mutex, to avoid dead lock.
811 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
813 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
815 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
817 // Collect all tasks from waiting tasks
818 for(auto& task : mWaitingTasks)
820 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
821 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
823 if((checkMask & mask) == checkMask)
826 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
830 // Collect all tasks from running tasks
831 for(auto& taskPair : mRunningTasks)
833 // Trace only if it is running now.
834 if(taskPair.second == RunningTaskState::RUNNING)
836 auto& task = taskPair.first;
837 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
838 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
840 if((checkMask & mask) == checkMask)
843 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
848 // Collect all tasks from complete tasks
849 for(auto& taskPair : mCompletedTasks)
851 // Trace only if it is need callback.
852 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
853 // If worker thread invocation, than it already remove trace at completed timing.
854 // If canceled cases, we don't append trace at running tasks already.
855 // So, we don't need to trace for SKIP_CALLBACK cases.
856 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
858 auto& task = taskPair.first;
859 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
860 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
862 if((checkMask & mask) == checkMask)
865 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
873 // If there is nothing to check task, just excute callback right now.
876 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
878 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
880 return tasksCompletedId;
883 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
885 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
888 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
890 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
892 AsyncTaskPtr nextCompletedTask = nullptr;
894 // Lock while popping task out from the queue
895 Mutex::ScopedLock lock(mCompletedTasksMutex);
897 while(!mCompletedTasks.empty())
899 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
901 auto next = mCompletedTasks.begin();
902 AsyncTaskPtr nextTask = next->first;
903 CompletedTaskState taskState = next->second;
904 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
905 mCompletedTasks.erase(next);
907 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p][%s] (callback required? : %d)\n", nextTask.Get(), GetTaskName(nextTask), taskState == CompletedTaskState::REQUIRE_CALLBACK);
909 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
911 nextCompletedTask = nextTask;
915 ignoredTaskList.push_back(nextTask);
918 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p][%s]\n", nextCompletedTask.Get(), GetTaskName(nextCompletedTask));
921 return nextCompletedTask;
924 void AsyncTaskManager::RegisterProcessor()
926 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
928 Dali::Adaptor::Get().RegisterProcessor(*this);
929 mProcessorRegistered = true;
933 void AsyncTaskManager::UnregisterProcessor()
935 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
937 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
938 // Keep processor at least 1 task exist.
939 // Please be careful the order of mutex, to avoid dead lock.
940 // TODO : Should we lock all mutex rightnow?
941 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
942 if(mWaitingTasks.empty())
944 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
945 if(mRunningTasks.empty())
947 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
948 if(mCompletedTasks.empty())
950 mProcessorRegistered = false;
951 Dali::Adaptor::Get().UnregisterProcessor(*this);
955 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
959 void AsyncTaskManager::TasksCompleted()
961 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
962 while(AsyncTaskPtr task = PopNextCompletedTask())
964 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p][%s]\n", task.Get(), GetTaskName(task));
965 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
967 // Remove TasksCompleted callback trace
968 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
970 mTasksCompletedImpl->RemoveTaskTrace(task);
974 UnregisterProcessor();
975 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
977 mTasksCompletedImpl->EmitCompletedTasks();
980 void AsyncTaskManager::Process(bool postProcessor)
985 /// Worker thread called
986 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
988 // Lock while popping task out from the queue
989 Mutex::ScopedLock lock(mWaitingTasksMutex);
991 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
993 // pop out the next task from the queue
994 AsyncTaskPtr nextTask = nullptr;
996 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
997 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
1000 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1002 if(mAvaliableLowPriorityTaskCounts == 0u)
1004 // There are no avaliabe tasks to run now. Return nullptr.
1009 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
1011 if((*iter)->IsReady())
1013 const auto priorityType = (*iter)->GetPriorityType();
1014 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
1017 // For thread safety
1018 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1020 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
1023 // Check whether we try to running same task at multiple threads.
1026 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1027 auto mapIter = mCacheImpl->mRunningTasksCache.find((*iter).Get());
1028 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1030 if(!mapIter->second.empty())
1032 // Some other thread running this tasks now. Ignore it.
1033 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Some other thread running this task [%p][%s]\n", (*iter).Get(), GetTaskName(*iter));
1034 taskAvaliable = false;
1043 // Add Running queue
1045 // Lock while popping task out from the queue
1046 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1048 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1050 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1051 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1053 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1054 mWaitingTasks.erase(iter);
1056 // Decrease avaliable task counts if it is low priority
1057 if(priorityType == AsyncTask::PriorityType::LOW)
1059 // We are under running task mutex. We can decrease it.
1060 --mAvaliableLowPriorityTaskCounts;
1064 if(priorityType == AsyncTask::PriorityType::HIGH && mWaitingHighProirityTaskCounts > 0u)
1066 // Decrease the number of waiting tasks for high priority.
1067 --mWaitingHighProirityTaskCounts;
1074 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1079 /// Worker thread called
1080 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1084 bool needTrigger = false;
1086 // Check now whether we need to execute callback or not, for worker thread cases.
1087 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1089 bool notify = false;
1091 // Lock while check validation of task.
1093 Mutex::ScopedLock lock(mRunningTasksMutex);
1095 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1096 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1098 const auto cacheIter = mapIter->second.begin();
1099 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1101 const auto iter = *cacheIter;
1102 DALI_ASSERT_DEBUG(iter->first == task);
1103 if(iter->second == RunningTaskState::RUNNING)
1105 // This task is valid.
1110 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p][%s] (is notify? : %d)\n", task.Get(), GetTaskName(task), notify);
1113 // We should execute this tasks complete callback out of mutex
1116 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p][%s]\n", task.Get(), GetTaskName(task));
1117 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1119 // We need to remove task trace now.
1120 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1122 mTasksCompletedImpl->RemoveTaskTrace(task);
1124 if(mTasksCompletedImpl->IsExecuteCallbackExist())
1126 // We need to call EmitCompletedTasks(). Trigger main thread.
1133 // Lock while adding task to the queue
1135 bool notify = false;
1137 Mutex::ScopedLock lock(mRunningTasksMutex);
1139 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1140 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1142 const auto cacheIter = mapIter->second.begin();
1143 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1145 const auto iter = *cacheIter;
1147 DALI_ASSERT_DEBUG(iter->first == task);
1148 if(iter->second == RunningTaskState::RUNNING)
1150 // This task is valid.
1154 const auto priorityType = iter->first->GetPriorityType();
1155 // Increase avaliable task counts if it is low priority
1156 if(priorityType == AsyncTask::PriorityType::LOW)
1158 // We are under running task mutex. We can increase it.
1159 ++mAvaliableLowPriorityTaskCounts;
1162 // Move task into completed, for ensure that AsyncTask destroy at main thread.
1164 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1166 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1168 needTrigger |= callbackRequired;
1170 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p][%s] (callback required? : %d)\n", task.Get(), GetTaskName(task), callbackRequired);
1172 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1173 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1175 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1176 mRunningTasks.erase(iter);
1180 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1183 // Now, task is invalidate.
1189 // Wake up the main thread
1192 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1193 mTrigger->Trigger();
1198 // AsyncTaskManager::TaskHelper
1200 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1201 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1205 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1206 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1210 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1211 : mProcessor(std::move(processor)),
1212 mAsyncTaskManager(asyncTaskManager)
1216 bool AsyncTaskManager::TaskHelper::Request()
1218 return mProcessor->Request();
1220 } // namespace Adaptor
1222 } // namespace Internal