2 * Copyright (c) 2024 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
28 #include <unordered_map>
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
49 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
58 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
67 uint32_t gThreadId = 0u; // Only for debug
71 * @brief Get the Task Name.
72 * Note that we can get const char* from std::string_view as data() since it will be const class.
74 * @param task The task what we want to get the name.
75 * @return The name of task, or (nil) if task is invalid.
77 const char* GetTaskName(AsyncTaskPtr task)
80 return task ? task->GetTaskName().data() : "(nil)";
83 } // unnamed namespace
87 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
89 mAsyncTaskManager(asyncTaskManager),
90 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
91 mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
92 mDestroyThread(false),
93 mIsThreadStarted(false),
98 AsyncTaskThread::~AsyncTaskThread()
102 ConditionalWait::ScopedLock lock(mConditionalWait);
103 mDestroyThread = true;
104 mConditionalWait.Notify(lock);
110 bool AsyncTaskThread::Request()
112 if(!mIsThreadStarted)
115 mIsThreadStarted = true;
119 // Lock while adding task to the queue
120 ConditionalWait::ScopedLock lock(mConditionalWait);
124 mIsThreadIdle = false;
126 // wake up the thread
127 mConditionalWait.Notify(lock);
135 void AsyncTaskThread::Run()
137 #if defined(DEBUG_ENABLED)
138 uint32_t threadId = gThreadId++;
141 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
145 SetThreadName("AsyncTaskThread");
147 mLogFactory.InstallLogFunction();
148 mTraceFactory.InstallTraceFunction();
150 while(!mDestroyThread)
152 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
155 ConditionalWait::ScopedLock lock(mConditionalWait);
158 mIsThreadIdle = true;
159 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
160 mConditionalWait.Wait(lock);
161 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
166 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
168 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
171 mAsyncTaskManager.CompleteTask(std::move(task));
177 // AsyncTaskManager::TasksCompletedImpl
179 struct AsyncTaskManager::TasksCompletedImpl
181 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
184 mEmitCompletedTaskTriggered(false)
190 * @brief Create new tasks completed id and.
191 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
192 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
194 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
196 // Lock while adding tasks completed callback list to the queue
197 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
199 auto id = mTasksCompletedCount++;
200 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
202 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
204 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
209 * @brief Append task that will be trace.
210 * @post RemoveTaskTrace should be called.
211 * @param[in] id The id of tasks completed.
212 * @param[in] task The task want to trace.
214 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
216 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p][%s]\n", id, task.Get(), GetTaskName(task));
218 // Lock while adding tasks completed callback list to the queue
219 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
221 auto iter = mTasksCompletedCallbackList.find(id);
222 if(iter == mTasksCompletedCallbackList.end())
224 // This task is already erased. Ignore.
228 auto& callbackData = iter->second;
230 auto jter = callbackData.mTasks.find(task.Get());
232 if(jter != callbackData.mTasks.end())
234 // Increase reference count.
239 callbackData.mTasks.insert({task.Get(), 1u});
244 * @brief Remove all task that were traced.
245 * @param[in] task The task want to remove trace.
246 * @param[in] taskCount The number of tasks that will be removed.
248 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
254 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p][%s] remove count[%u]\n", task.Get(), GetTaskName(task), count);
256 // Lock while removing tasks completed callback list to the queue
257 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
259 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
261 auto& callbackData = iter->second;
262 bool eraseCallbackData = false;
264 auto jter = callbackData.mTasks.find(task.Get());
266 if(jter != callbackData.mTasks.end())
268 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p][%s], current refcount[%u]\n", iter->first, task.Get(), GetTaskName(task), (jter->second));
270 if(jter->second <= count)
272 callbackData.mTasks.erase(jter);
274 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
276 if(callbackData.mTasks.empty())
278 eraseCallbackData = true;
280 // Move callback base into list.
281 // (To avoid task container changed during callback emit)
282 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
284 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
286 iter = mTasksCompletedCallbackList.erase(iter);
291 jter->second -= count;
295 if(!eraseCallbackData)
303 * @brief Check whether current TasksCompletedId completed or not.
304 * @param[in] id The id of tasks completed.
305 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
307 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
309 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
311 // Lock while removing tasks completed callback list to the queue
312 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
314 auto iter = mTasksCompletedCallbackList.find(id);
315 if(iter != mTasksCompletedCallbackList.end())
317 auto& callbackData = iter->second;
318 if(callbackData.mTasks.empty())
320 // Move callback base into list.
321 // (To avoid task container changed during callback emit)
322 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
324 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
326 iter = mTasksCompletedCallbackList.erase(iter);
336 * @brief Remove taskS completed callbacks by id.
337 * @param[in] id The id of taskS completed.
338 * @return True if taskS completed id removed. False otherwise.
340 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
342 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
344 // Lock while removing taskS completed callback list to the queue
345 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
347 auto iter = mTasksCompletedCallbackList.find(id);
348 if(iter == mTasksCompletedCallbackList.end())
350 // This task is already erased, or completed.
351 // Erase from completed excute callback list.
353 // Lock while removing excute callback list to the queue
354 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
356 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
358 if(iter->second == id)
360 iter = mExcuteCallbackList.erase(iter);
370 // This task is alread erased and completed. Ignore.
374 mTasksCompletedCallbackList.erase(iter);
380 * @brief Emit all completed callbacks.
381 * @note This API should be called at event thread.
383 void EmitCompletedTasks()
385 ExecuteCallbackContainer executeCallbackList;
387 // Lock while removing excute callback list to the queue
388 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
390 mEmitCompletedTaskTriggered = false;
392 // Copy callback lists, for let we execute callbacks out of mutex
393 executeCallbackList = std::move(mExcuteCallbackList);
394 mExcuteCallbackList.clear();
397 if(!executeCallbackList.empty())
399 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
400 // Execute all callbacks
401 for(auto&& callbackPair : executeCallbackList)
403 auto& callback = callbackPair.first;
404 auto id = callbackPair.second;
406 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
408 Dali::CallbackBase::Execute(*callback, id);
411 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
416 * @brief Check whether there is some completed signal what we need to trace, or not.
417 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
419 bool IsTasksCompletedCallbackExist()
421 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
422 return !mTasksCompletedCallbackList.empty();
426 * @brief Check whether there is some completed signal what we need to execute, or not.
427 * @return True if mExcuteCallbackList is not empty. False otherwise.
429 bool IsExecuteCallbackExist()
431 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
432 return !mExcuteCallbackList.empty();
436 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
438 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
440 // Lock while adding excute callback list to the queue
441 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
443 mExcuteCallbackList.emplace_back(std::move(callback), id);
445 if(!mEmitCompletedTaskTriggered)
447 mEmitCompletedTaskTriggered = true;
449 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
458 CallbackData(CallbackBase* callback)
459 : mCallback(callback),
464 CallbackData(CallbackData&& rhs) noexcept
465 : mCallback(std::move(rhs.mCallback)),
466 mTasks(std::move(rhs.mTasks))
470 CallbackData& operator=(CallbackData&& rhs) noexcept
474 mCallback = std::move(rhs.mCallback);
475 mTasks = std::move(rhs.mTasks);
482 // Delete copy operator.
483 CallbackData(const CallbackData& rhs) = delete;
484 CallbackData& operator=(const CallbackData& rhs) = delete;
487 std::unique_ptr<CallbackBase> mCallback;
488 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
492 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
493 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
495 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
497 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
498 TasksCompletedContainer mTasksCompletedCallbackList;
500 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
501 ExecuteCallbackContainer mExcuteCallbackList;
503 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
504 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
506 bool mEmitCompletedTaskTriggered : 1;
509 // AsyncTaskManager::CacheImpl
511 struct AsyncTaskManager::CacheImpl
513 CacheImpl(AsyncTaskManager& manager)
519 // Insert / Erase task cache API.
522 * @brief Insert cache that input task.
523 * @pre Mutex be locked.
525 template<typename CacheContainer, typename Iterator>
526 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
528 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
529 cacheContainer.insert(cacheContainer.end(), iterator);
533 * @brief Erase cache that input task.
534 * @pre Mutex be locked.
536 template<typename CacheContainer, typename Iterator>
537 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
539 auto mapIter = cacheMap.find(task.Get());
540 if(mapIter != cacheMap.end())
542 auto& cacheContainer = (*mapIter).second;
543 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
545 if(cacheIter != cacheContainer.end())
547 cacheContainer.erase(cacheIter);
548 if(cacheContainer.empty())
550 cacheMap.erase(mapIter);
557 * @brief Erase all cache that input task.
558 * @pre Mutex be locked.
560 template<typename CacheContainer>
561 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
563 auto mapIter = cacheMap.find(task.Get());
564 if(mapIter != cacheMap.end())
566 cacheMap.erase(mapIter);
571 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
573 // Keep cache iterators as list since we take tasks by FIFO as default.
574 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
575 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
576 using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
578 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
579 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
580 CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
585 Dali::AsyncTaskManager AsyncTaskManager::Get()
587 Dali::AsyncTaskManager manager;
588 SingletonService singletonService(SingletonService::Get());
591 // Check whether the async task manager is already created
592 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
595 // If so, downcast the handle of singleton
596 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
601 // If not, create the async task manager and register it as a singleton
602 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
603 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
604 singletonService.Register(typeid(manager), manager);
610 AsyncTaskManager::AsyncTaskManager()
611 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
612 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
613 mWaitingHighProirityTaskCounts(0u),
614 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
615 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
616 mCacheImpl(new CacheImpl(*this)),
617 mProcessorRegistered(false)
621 AsyncTaskManager::~AsyncTaskManager()
623 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
625 mProcessorRegistered = false;
626 Dali::Adaptor::Get().UnregisterProcessor(*this);
632 // Remove task completed impl and cache impl after all threads are join.
633 mTasksCompletedImpl.reset();
636 // Remove tasks after CacheImpl removed
637 mWaitingTasks.clear();
638 mRunningTasks.clear();
639 mCompletedTasks.clear();
642 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
646 // Lock while adding task to the queue
647 Mutex::ScopedLock lock(mWaitingTasksMutex);
649 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p][%s]\n", task.Get(), GetTaskName(task));
651 // push back into waiting queue.
652 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
653 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
655 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
657 // Increase the number of waiting tasks for high priority.
658 ++mWaitingHighProirityTaskCounts;
663 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
665 // Finish all Running threads are working
666 if(mRunningTasks.size() >= mTasks.GetElementCount())
673 size_t count = mTasks.GetElementCount();
675 while(index++ < count)
677 auto processHelperIt = mTasks.GetNext();
678 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
679 if(processHelperIt->Request())
683 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
686 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
692 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
696 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p][%s]\n", task.Get(), GetTaskName(task));
698 // Check whether we need to unregister processor.
699 // If there is some non-empty queue exist, we don't need to unregister processor.
700 bool needCheckUnregisterProcessor = true;
702 uint32_t removedCount = 0u;
705 // Lock while remove task from the queue
706 Mutex::ScopedLock lock(mWaitingTasksMutex);
708 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
709 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
711 for(auto& iterator : mapIter->second)
713 DALI_ASSERT_DEBUG((*iterator) == task);
714 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
716 // Decrease the number of waiting tasks for high priority.
717 --mWaitingHighProirityTaskCounts;
719 mWaitingTasks.erase(iterator);
722 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
725 if(!mWaitingTasks.empty())
727 needCheckUnregisterProcessor = false;
732 // Lock while remove task from the queue
733 Mutex::ScopedLock lock(mRunningTasksMutex);
735 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
736 if(mapIter != mCacheImpl->mRunningTasksCache.end())
738 for(auto& iterator : mapIter->second)
740 DALI_ASSERT_DEBUG((*iterator).first == task);
741 // We cannot erase container. Just mark as canceled.
742 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
743 if((*iterator).second == RunningTaskState::RUNNING)
745 (*iterator).second = RunningTaskState::CANCELED;
751 if(!mRunningTasks.empty())
753 needCheckUnregisterProcessor = false;
758 // Lock while remove task from the queue
759 Mutex::ScopedLock lock(mCompletedTasksMutex);
761 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
762 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
764 for(auto& iterator : mapIter->second)
766 DALI_ASSERT_DEBUG((*iterator).first == task);
767 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
771 mCompletedTasks.erase(iterator);
773 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
776 if(!mCompletedTasks.empty())
778 needCheckUnregisterProcessor = false;
782 // Remove TasksCompleted callback trace
783 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
785 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
788 // UnregisterProcessor required to lock mutex. Call this API only if required.
789 if(needCheckUnregisterProcessor)
791 UnregisterProcessor();
796 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
798 // mTasksCompletedImpl will take ownership of callback.
799 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
801 bool taskAdded = false; ///< Flag whether at least one task tracing now.
803 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
805 // Please be careful the order of mutex, to avoid dead lock.
807 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
809 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
811 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
813 // Collect all tasks from waiting tasks
814 for(auto& task : mWaitingTasks)
816 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
817 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
819 if((checkMask & mask) == checkMask)
822 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
826 // Collect all tasks from running tasks
827 for(auto& taskPair : mRunningTasks)
829 // Trace only if it is running now.
830 if(taskPair.second == RunningTaskState::RUNNING)
832 auto& task = taskPair.first;
833 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
834 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
836 if((checkMask & mask) == checkMask)
839 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
844 // Collect all tasks from complete tasks
845 for(auto& taskPair : mCompletedTasks)
847 // Trace only if it is need callback.
848 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
849 // If worker thread invocation, than it already remove trace at completed timing.
850 // If canceled cases, we don't append trace at running tasks already.
851 // So, we don't need to trace for SKIP_CALLBACK cases.
852 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
854 auto& task = taskPair.first;
855 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
856 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
858 if((checkMask & mask) == checkMask)
861 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
869 // If there is nothing to check task, just excute callback right now.
872 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
874 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
876 return tasksCompletedId;
879 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
881 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
884 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
886 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
888 AsyncTaskPtr nextCompletedTask = nullptr;
890 // Lock while popping task out from the queue
891 Mutex::ScopedLock lock(mCompletedTasksMutex);
893 while(!mCompletedTasks.empty())
895 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
897 auto next = mCompletedTasks.begin();
898 AsyncTaskPtr nextTask = next->first;
899 CompletedTaskState taskState = next->second;
900 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
901 mCompletedTasks.erase(next);
903 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p][%s] (callback required? : %d)\n", nextTask.Get(), GetTaskName(nextTask), taskState == CompletedTaskState::REQUIRE_CALLBACK);
905 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
907 nextCompletedTask = nextTask;
911 ignoredTaskList.push_back(nextTask);
914 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p][%s]\n", nextCompletedTask.Get(), GetTaskName(nextCompletedTask));
917 return nextCompletedTask;
920 void AsyncTaskManager::RegisterProcessor()
922 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
924 Dali::Adaptor::Get().RegisterProcessor(*this);
925 mProcessorRegistered = true;
929 void AsyncTaskManager::UnregisterProcessor()
931 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
933 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
934 // Keep processor at least 1 task exist.
935 // Please be careful the order of mutex, to avoid dead lock.
936 // TODO : Should we lock all mutex rightnow?
937 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
938 if(mWaitingTasks.empty())
940 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
941 if(mRunningTasks.empty())
943 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
944 if(mCompletedTasks.empty())
946 mProcessorRegistered = false;
947 Dali::Adaptor::Get().UnregisterProcessor(*this);
951 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
955 void AsyncTaskManager::TasksCompleted()
957 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
958 while(AsyncTaskPtr task = PopNextCompletedTask())
960 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p][%s]\n", task.Get(), GetTaskName(task));
961 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
963 // Remove TasksCompleted callback trace
964 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
966 mTasksCompletedImpl->RemoveTaskTrace(task);
970 UnregisterProcessor();
971 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
973 mTasksCompletedImpl->EmitCompletedTasks();
976 void AsyncTaskManager::Process(bool postProcessor)
981 /// Worker thread called
982 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
984 // Lock while popping task out from the queue
985 Mutex::ScopedLock lock(mWaitingTasksMutex);
987 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
989 // pop out the next task from the queue
990 AsyncTaskPtr nextTask = nullptr;
992 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
993 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
996 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
998 if(mAvaliableLowPriorityTaskCounts == 0u)
1000 // There are no avaliabe tasks to run now. Return nullptr.
1005 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
1007 if((*iter)->IsReady())
1009 const auto priorityType = (*iter)->GetPriorityType();
1010 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
1013 // For thread safety
1014 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1016 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
1019 // Check whether we try to running same task at multiple threads.
1022 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1023 auto mapIter = mCacheImpl->mRunningTasksCache.find((*iter).Get());
1024 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1026 if(!mapIter->second.empty())
1028 // Some other thread running this tasks now. Ignore it.
1029 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Some other thread running this task [%p][%s]\n", (*iter).Get(), GetTaskName(*iter));
1030 taskAvaliable = false;
1039 // Add Running queue
1041 // Lock while popping task out from the queue
1042 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1044 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1046 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1047 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1049 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1050 mWaitingTasks.erase(iter);
1052 // Decrease avaliable task counts if it is low priority
1053 if(priorityType == AsyncTask::PriorityType::LOW)
1055 // We are under running task mutex. We can decrease it.
1056 --mAvaliableLowPriorityTaskCounts;
1060 if(priorityType == AsyncTask::PriorityType::HIGH)
1062 // Decrease the number of waiting tasks for high priority.
1063 --mWaitingHighProirityTaskCounts;
1070 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1075 /// Worker thread called
1076 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1080 bool needTrigger = false;
1082 // Check now whether we need to execute callback or not, for worker thread cases.
1083 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1085 bool notify = false;
1087 // Lock while check validation of task.
1089 Mutex::ScopedLock lock(mRunningTasksMutex);
1091 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1092 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1094 const auto cacheIter = mapIter->second.begin();
1095 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1097 const auto iter = *cacheIter;
1098 DALI_ASSERT_DEBUG(iter->first == task);
1099 if(iter->second == RunningTaskState::RUNNING)
1101 // This task is valid.
1106 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p][%s] (is notify? : %d)\n", task.Get(), GetTaskName(task), notify);
1109 // We should execute this tasks complete callback out of mutex
1112 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p][%s]\n", task.Get(), GetTaskName(task));
1113 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1115 // We need to remove task trace now.
1116 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1118 mTasksCompletedImpl->RemoveTaskTrace(task);
1120 if(mTasksCompletedImpl->IsExecuteCallbackExist())
1122 // We need to call EmitCompletedTasks(). Trigger main thread.
1129 // Lock while adding task to the queue
1131 bool notify = false;
1133 Mutex::ScopedLock lock(mRunningTasksMutex);
1135 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1136 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1138 const auto cacheIter = mapIter->second.begin();
1139 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1141 const auto iter = *cacheIter;
1143 DALI_ASSERT_DEBUG(iter->first == task);
1144 if(iter->second == RunningTaskState::RUNNING)
1146 // This task is valid.
1150 const auto priorityType = iter->first->GetPriorityType();
1151 // Increase avaliable task counts if it is low priority
1152 if(priorityType == AsyncTask::PriorityType::LOW)
1154 // We are under running task mutex. We can increase it.
1155 ++mAvaliableLowPriorityTaskCounts;
1158 // Move task into completed, for ensure that AsyncTask destroy at main thread.
1160 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1162 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1164 needTrigger |= callbackRequired;
1166 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p][%s] (callback required? : %d)\n", task.Get(), GetTaskName(task), callbackRequired);
1168 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1169 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1171 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1172 mRunningTasks.erase(iter);
1176 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1179 // Now, task is invalidate.
1185 // Wake up the main thread
1188 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1189 mTrigger->Trigger();
1194 // AsyncTaskManager::TaskHelper
1196 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1197 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1201 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1202 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1206 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1207 : mProcessor(std::move(processor)),
1208 mAsyncTaskManager(asyncTaskManager)
1212 bool AsyncTaskManager::TaskHelper::Request()
1214 return mProcessor->Request();
1216 } // namespace Adaptor
1218 } // namespace Internal