2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27 #include <dali/integration-api/trace.h>
29 #include <unordered_map>
39 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
41 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
42 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
44 // The number of threads for low priority task.
45 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
46 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
48 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
50 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
51 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
52 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
53 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
54 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
57 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
59 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
60 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
61 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
62 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
65 #if defined(DEBUG_ENABLED)
66 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
68 uint32_t gThreadId = 0u; // Only for debug
71 DALI_INIT_TRACE_FILTER(gTraceFilter, DALI_TRACE_PERFORMANCE_MARKER, false);
74 * @brief Get the Task Name.
75 * Note that we can get const char* from std::string_view as data() since it will be const class.
77 * @param task The task what we want to get the name.
78 * @return The name of task, or (nil) if task is invalid.
80 const char* GetTaskName(AsyncTaskPtr task)
83 return task ? task->GetTaskName().data() : "(nil)";
86 } // unnamed namespace
90 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
92 mAsyncTaskManager(asyncTaskManager),
93 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
94 mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
95 mDestroyThread(false),
96 mIsThreadStarted(false),
101 AsyncTaskThread::~AsyncTaskThread()
105 ConditionalWait::ScopedLock lock(mConditionalWait);
106 mDestroyThread = true;
107 mConditionalWait.Notify(lock);
113 bool AsyncTaskThread::Request()
115 if(!mIsThreadStarted)
118 mIsThreadStarted = true;
122 // Lock while adding task to the queue
123 ConditionalWait::ScopedLock lock(mConditionalWait);
127 mIsThreadIdle = false;
129 // wake up the thread
130 mConditionalWait.Notify(lock);
138 void AsyncTaskThread::Run()
140 #if defined(DEBUG_ENABLED)
141 uint32_t threadId = gThreadId++;
144 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
148 SetThreadName("AsyncTaskThread");
150 mLogFactory.InstallLogFunction();
151 mTraceFactory.InstallTraceFunction();
153 while(!mDestroyThread)
155 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
158 ConditionalWait::ScopedLock lock(mConditionalWait);
161 mIsThreadIdle = true;
162 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
163 mConditionalWait.Wait(lock);
164 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
169 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
170 DALI_TRACE_BEGIN(gTraceFilter, GetTaskName(task));
172 DALI_TRACE_END(gTraceFilter, GetTaskName(task));
173 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p][%s]\n", threadId, task.Get(), GetTaskName(task));
176 mAsyncTaskManager.CompleteTask(std::move(task));
182 // AsyncTaskManager::TasksCompletedImpl
184 struct AsyncTaskManager::TasksCompletedImpl
186 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
189 mEmitCompletedTaskTriggered(false)
195 * @brief Create new tasks completed id and.
196 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
197 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
199 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
201 // Lock while adding tasks completed callback list to the queue
202 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
204 auto id = mTasksCompletedCount++;
205 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
207 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
209 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
214 * @brief Append task that will be trace.
215 * @post RemoveTaskTrace should be called.
216 * @param[in] id The id of tasks completed.
217 * @param[in] task The task want to trace.
219 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
221 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p][%s]\n", id, task.Get(), GetTaskName(task));
223 // Lock while adding tasks completed callback list to the queue
224 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
226 auto iter = mTasksCompletedCallbackList.find(id);
227 if(iter == mTasksCompletedCallbackList.end())
229 // This task is already erased. Ignore.
233 auto& callbackData = iter->second;
235 auto jter = callbackData.mTasks.find(task.Get());
237 if(jter != callbackData.mTasks.end())
239 // Increase reference count.
244 callbackData.mTasks.insert({task.Get(), 1u});
249 * @brief Remove all task that were traced.
250 * @param[in] task The task want to remove trace.
251 * @param[in] taskCount The number of tasks that will be removed.
253 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
259 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p][%s] remove count[%u]\n", task.Get(), GetTaskName(task), count);
261 // Lock while removing tasks completed callback list to the queue
262 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
264 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
266 auto& callbackData = iter->second;
267 bool eraseCallbackData = false;
269 auto jter = callbackData.mTasks.find(task.Get());
271 if(jter != callbackData.mTasks.end())
273 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p][%s], current refcount[%u]\n", iter->first, task.Get(), GetTaskName(task), (jter->second));
275 if(jter->second <= count)
277 callbackData.mTasks.erase(jter);
279 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
281 if(callbackData.mTasks.empty())
283 eraseCallbackData = true;
285 // Move callback base into list.
286 // (To avoid task container changed during callback emit)
287 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
289 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
291 iter = mTasksCompletedCallbackList.erase(iter);
296 jter->second -= count;
300 if(!eraseCallbackData)
308 * @brief Check whether current TasksCompletedId completed or not.
309 * @param[in] id The id of tasks completed.
310 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
312 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
314 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
316 // Lock while removing tasks completed callback list to the queue
317 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
319 auto iter = mTasksCompletedCallbackList.find(id);
320 if(iter != mTasksCompletedCallbackList.end())
322 auto& callbackData = iter->second;
323 if(callbackData.mTasks.empty())
325 // Move callback base into list.
326 // (To avoid task container changed during callback emit)
327 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
329 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
331 iter = mTasksCompletedCallbackList.erase(iter);
341 * @brief Remove taskS completed callbacks by id.
342 * @param[in] id The id of taskS completed.
343 * @return True if taskS completed id removed. False otherwise.
345 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
347 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
349 // Lock while removing taskS completed callback list to the queue
350 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
352 auto iter = mTasksCompletedCallbackList.find(id);
353 if(iter == mTasksCompletedCallbackList.end())
355 // This task is already erased, or completed.
356 // Erase from completed excute callback list.
358 // Lock while removing excute callback list to the queue
359 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
361 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
363 if(iter->second == id)
365 iter = mExcuteCallbackList.erase(iter);
375 // This task is alread erased and completed. Ignore.
379 mTasksCompletedCallbackList.erase(iter);
385 * @brief Emit all completed callbacks.
386 * @note This API should be called at event thread.
388 void EmitCompletedTasks()
390 ExecuteCallbackContainer executeCallbackList;
392 // Lock while removing excute callback list to the queue
393 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
395 mEmitCompletedTaskTriggered = false;
397 // Copy callback lists, for let we execute callbacks out of mutex
398 executeCallbackList = std::move(mExcuteCallbackList);
399 mExcuteCallbackList.clear();
402 if(!executeCallbackList.empty())
404 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
405 // Execute all callbacks
406 for(auto&& callbackPair : executeCallbackList)
408 auto& callback = callbackPair.first;
409 auto id = callbackPair.second;
411 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
413 Dali::CallbackBase::Execute(*callback, id);
416 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
421 * @brief Check whether there is some completed signal what we need to trace, or not.
422 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
424 bool IsTasksCompletedCallbackExist()
426 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
427 return !mTasksCompletedCallbackList.empty();
431 * @brief Check whether there is some completed signal what we need to execute, or not.
432 * @return True if mExcuteCallbackList is not empty. False otherwise.
434 bool IsExecuteCallbackExist()
436 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
437 return !mExcuteCallbackList.empty();
441 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
443 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
445 // Lock while adding excute callback list to the queue
446 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
448 mExcuteCallbackList.emplace_back(std::move(callback), id);
450 if(!mEmitCompletedTaskTriggered)
452 mEmitCompletedTaskTriggered = true;
454 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
463 CallbackData(CallbackBase* callback)
464 : mCallback(callback),
469 CallbackData(CallbackData&& rhs) noexcept
470 : mCallback(std::move(rhs.mCallback)),
471 mTasks(std::move(rhs.mTasks))
475 CallbackData& operator=(CallbackData&& rhs) noexcept
479 mCallback = std::move(rhs.mCallback);
480 mTasks = std::move(rhs.mTasks);
487 // Delete copy operator.
488 CallbackData(const CallbackData& rhs) = delete;
489 CallbackData& operator=(const CallbackData& rhs) = delete;
492 std::unique_ptr<CallbackBase> mCallback;
493 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
497 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
498 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
500 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
502 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
503 TasksCompletedContainer mTasksCompletedCallbackList;
505 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
506 ExecuteCallbackContainer mExcuteCallbackList;
508 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
509 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
511 bool mEmitCompletedTaskTriggered : 1;
514 // AsyncTaskManager::CacheImpl
516 struct AsyncTaskManager::CacheImpl
518 CacheImpl(AsyncTaskManager& manager)
524 // Insert / Erase task cache API.
527 * @brief Insert cache that input task.
528 * @pre Mutex be locked.
530 template<typename CacheContainer, typename Iterator>
531 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
533 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
534 cacheContainer.insert(cacheContainer.end(), iterator);
538 * @brief Erase cache that input task.
539 * @pre Mutex be locked.
541 template<typename CacheContainer, typename Iterator>
542 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
544 auto mapIter = cacheMap.find(task.Get());
545 if(mapIter != cacheMap.end())
547 auto& cacheContainer = (*mapIter).second;
548 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
550 if(cacheIter != cacheContainer.end())
552 cacheContainer.erase(cacheIter);
553 if(cacheContainer.empty())
555 cacheMap.erase(mapIter);
562 * @brief Erase all cache that input task.
563 * @pre Mutex be locked.
565 template<typename CacheContainer>
566 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
568 auto mapIter = cacheMap.find(task.Get());
569 if(mapIter != cacheMap.end())
571 cacheMap.erase(mapIter);
576 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
578 // Keep cache iterators as list since we take tasks by FIFO as default.
579 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
580 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
581 using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
583 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
584 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
585 CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
590 Dali::AsyncTaskManager AsyncTaskManager::Get()
592 Dali::AsyncTaskManager manager;
593 SingletonService singletonService(SingletonService::Get());
596 // Check whether the async task manager is already created
597 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
600 // If so, downcast the handle of singleton
601 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
606 // If not, create the async task manager and register it as a singleton
607 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
608 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
609 singletonService.Register(typeid(manager), manager);
615 AsyncTaskManager::AsyncTaskManager()
616 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
617 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
618 mWaitingHighProirityTaskCounts(0u),
619 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
620 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
621 mCacheImpl(new CacheImpl(*this)),
622 mProcessorRegistered(false)
626 AsyncTaskManager::~AsyncTaskManager()
628 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
630 mProcessorRegistered = false;
631 Dali::Adaptor::Get().UnregisterProcessor(*this);
637 // Remove task completed impl and cache impl after all threads are join.
638 mTasksCompletedImpl.reset();
641 // Remove tasks after CacheImpl removed
642 mWaitingTasks.clear();
643 mRunningTasks.clear();
644 mCompletedTasks.clear();
647 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
651 // Lock while adding task to the queue
652 Mutex::ScopedLock lock(mWaitingTasksMutex);
654 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p][%s]\n", task.Get(), GetTaskName(task));
656 // push back into waiting queue.
657 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
658 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
660 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
662 // Increase the number of waiting tasks for high priority.
663 ++mWaitingHighProirityTaskCounts;
668 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
670 // Finish all Running threads are working
671 if(mRunningTasks.size() >= mTasks.GetElementCount())
678 size_t count = mTasks.GetElementCount();
680 while(index++ < count)
682 auto processHelperIt = mTasks.GetNext();
683 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
684 if(processHelperIt->Request())
688 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
691 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
697 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
701 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p][%s]\n", task.Get(), GetTaskName(task));
703 // Check whether we need to unregister processor.
704 // If there is some non-empty queue exist, we don't need to unregister processor.
705 bool needCheckUnregisterProcessor = true;
707 uint32_t removedCount = 0u;
710 // Lock while remove task from the queue
711 Mutex::ScopedLock lock(mWaitingTasksMutex);
713 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
714 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
716 for(auto& iterator : mapIter->second)
718 DALI_ASSERT_DEBUG((*iterator) == task);
719 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
721 // Decrease the number of waiting tasks for high priority.
722 --mWaitingHighProirityTaskCounts;
724 mWaitingTasks.erase(iterator);
727 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
730 if(!mWaitingTasks.empty())
732 needCheckUnregisterProcessor = false;
737 // Lock while remove task from the queue
738 Mutex::ScopedLock lock(mRunningTasksMutex);
740 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
741 if(mapIter != mCacheImpl->mRunningTasksCache.end())
743 for(auto& iterator : mapIter->second)
745 DALI_ASSERT_DEBUG((*iterator).first == task);
746 // We cannot erase container. Just mark as canceled.
747 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
748 if((*iterator).second == RunningTaskState::RUNNING)
750 (*iterator).second = RunningTaskState::CANCELED;
756 if(!mRunningTasks.empty())
758 needCheckUnregisterProcessor = false;
763 // Lock while remove task from the queue
764 Mutex::ScopedLock lock(mCompletedTasksMutex);
766 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
767 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
769 for(auto& iterator : mapIter->second)
771 DALI_ASSERT_DEBUG((*iterator).first == task);
772 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
776 mCompletedTasks.erase(iterator);
778 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
781 if(!mCompletedTasks.empty())
783 needCheckUnregisterProcessor = false;
787 // Remove TasksCompleted callback trace
788 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
790 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
793 // UnregisterProcessor required to lock mutex. Call this API only if required.
794 if(needCheckUnregisterProcessor)
796 UnregisterProcessor();
801 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
803 // mTasksCompletedImpl will take ownership of callback.
804 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
806 bool taskAdded = false; ///< Flag whether at least one task tracing now.
808 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
810 // Please be careful the order of mutex, to avoid dead lock.
812 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
814 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
816 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
818 // Collect all tasks from waiting tasks
819 for(auto& task : mWaitingTasks)
821 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
822 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
824 if((checkMask & mask) == checkMask)
827 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
831 // Collect all tasks from running tasks
832 for(auto& taskPair : mRunningTasks)
834 // Trace only if it is running now.
835 if(taskPair.second == RunningTaskState::RUNNING)
837 auto& task = taskPair.first;
838 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
839 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
841 if((checkMask & mask) == checkMask)
844 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
849 // Collect all tasks from complete tasks
850 for(auto& taskPair : mCompletedTasks)
852 // Trace only if it is need callback.
853 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
854 // If worker thread invocation, than it already remove trace at completed timing.
855 // If canceled cases, we don't append trace at running tasks already.
856 // So, we don't need to trace for SKIP_CALLBACK cases.
857 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
859 auto& task = taskPair.first;
860 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
861 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
863 if((checkMask & mask) == checkMask)
866 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
874 // If there is nothing to check task, just excute callback right now.
877 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
879 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
881 return tasksCompletedId;
884 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
886 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
889 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
891 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
893 AsyncTaskPtr nextCompletedTask = nullptr;
895 // Lock while popping task out from the queue
896 Mutex::ScopedLock lock(mCompletedTasksMutex);
898 while(!mCompletedTasks.empty())
900 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
902 auto next = mCompletedTasks.begin();
903 AsyncTaskPtr nextTask = next->first;
904 CompletedTaskState taskState = next->second;
905 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
906 mCompletedTasks.erase(next);
908 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p][%s] (callback required? : %d)\n", nextTask.Get(), GetTaskName(nextTask), taskState == CompletedTaskState::REQUIRE_CALLBACK);
910 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
912 nextCompletedTask = nextTask;
916 ignoredTaskList.push_back(nextTask);
919 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p][%s]\n", nextCompletedTask.Get(), GetTaskName(nextCompletedTask));
922 return nextCompletedTask;
925 void AsyncTaskManager::RegisterProcessor()
927 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
929 Dali::Adaptor::Get().RegisterProcessor(*this);
930 mProcessorRegistered = true;
934 void AsyncTaskManager::UnregisterProcessor()
936 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
938 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
939 // Keep processor at least 1 task exist.
940 // Please be careful the order of mutex, to avoid dead lock.
941 // TODO : Should we lock all mutex rightnow?
942 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
943 if(mWaitingTasks.empty())
945 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
946 if(mRunningTasks.empty())
948 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
949 if(mCompletedTasks.empty())
951 mProcessorRegistered = false;
952 Dali::Adaptor::Get().UnregisterProcessor(*this);
956 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
960 void AsyncTaskManager::TasksCompleted()
962 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
963 while(AsyncTaskPtr task = PopNextCompletedTask())
965 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p][%s]\n", task.Get(), GetTaskName(task));
966 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
968 // Remove TasksCompleted callback trace
969 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
971 mTasksCompletedImpl->RemoveTaskTrace(task);
975 UnregisterProcessor();
976 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
978 mTasksCompletedImpl->EmitCompletedTasks();
981 void AsyncTaskManager::Process(bool postProcessor)
986 /// Worker thread called
987 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
989 // Lock while popping task out from the queue
990 Mutex::ScopedLock lock(mWaitingTasksMutex);
992 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
994 // pop out the next task from the queue
995 AsyncTaskPtr nextTask = nullptr;
997 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
998 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
1000 // For thread safety
1001 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1003 if(mAvaliableLowPriorityTaskCounts == 0u)
1005 // There are no avaliabe tasks to run now. Return nullptr.
1010 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
1012 if((*iter)->IsReady())
1014 const auto priorityType = (*iter)->GetPriorityType();
1015 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
1018 // For thread safety
1019 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1021 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
1024 // Check whether we try to running same task at multiple threads.
1027 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1028 auto mapIter = mCacheImpl->mRunningTasksCache.find((*iter).Get());
1029 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1031 if(!mapIter->second.empty())
1033 // Some other thread running this tasks now. Ignore it.
1034 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Some other thread running this task [%p][%s]\n", (*iter).Get(), GetTaskName(*iter));
1035 taskAvaliable = false;
1044 // Add Running queue
1046 // Lock while popping task out from the queue
1047 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1049 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1051 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1052 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1054 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1055 mWaitingTasks.erase(iter);
1057 // Decrease avaliable task counts if it is low priority
1058 if(priorityType == AsyncTask::PriorityType::LOW)
1060 // We are under running task mutex. We can decrease it.
1061 --mAvaliableLowPriorityTaskCounts;
1065 if(priorityType == AsyncTask::PriorityType::HIGH)
1067 // Decrease the number of waiting tasks for high priority.
1068 --mWaitingHighProirityTaskCounts;
1075 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p][%s]\n", nextTask.Get(), GetTaskName(nextTask));
1080 /// Worker thread called
1081 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1083 bool notify = false;
1087 bool needTrigger = false;
1089 // Lock while check validation of task.
1091 Mutex::ScopedLock lock(mRunningTasksMutex);
1093 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1094 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1096 const auto cacheIter = mapIter->second.begin();
1097 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1099 const auto iter = *cacheIter;
1100 DALI_ASSERT_DEBUG(iter->first == task);
1101 if(iter->second == RunningTaskState::RUNNING)
1103 // This task is valid.
1108 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p][%s] (is notify? : %d)\n", task.Get(), GetTaskName(task), notify);
1111 // We should execute this tasks complete callback out of mutex
1112 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1114 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p][%s]\n", task.Get(), GetTaskName(task));
1115 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1117 // We need to remove task trace now.
1118 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1120 mTasksCompletedImpl->RemoveTaskTrace(task);
1122 if(mTasksCompletedImpl->IsExecuteCallbackExist())
1124 // We need to call EmitCompletedTasks(). Trigger main thread.
1130 // Lock while adding task to the queue
1132 Mutex::ScopedLock lock(mRunningTasksMutex);
1134 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1135 if(mapIter != mCacheImpl->mRunningTasksCache.end())
1137 const auto cacheIter = mapIter->second.begin();
1138 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1140 const auto iter = *cacheIter;
1141 const auto priorityType = iter->first->GetPriorityType();
1142 // Increase avaliable task counts if it is low priority
1143 if(priorityType == AsyncTask::PriorityType::LOW)
1145 // We are under running task mutex. We can increase it.
1146 ++mAvaliableLowPriorityTaskCounts;
1149 // Move task into completed, for ensure that AsyncTask destroy at main thread.
1151 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1153 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1155 needTrigger |= callbackRequired;
1157 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p][%s] (callback required? : %d)\n", task.Get(), GetTaskName(task), callbackRequired);
1159 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1160 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1162 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1163 mRunningTasks.erase(iter);
1167 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1170 // Now, task is invalidate.
1176 // Wake up the main thread
1179 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1180 mTrigger->Trigger();
1185 // AsyncTaskManager::TaskHelper
1187 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1188 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1192 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1193 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1197 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1198 : mProcessor(std::move(processor)),
1199 mAsyncTaskManager(asyncTaskManager)
1203 bool AsyncTaskManager::TaskHelper::Request()
1205 return mProcessor->Request();
1207 } // namespace Adaptor
1209 } // namespace Internal