2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
28 #include <unordered_map>
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
49 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
58 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
67 uint32_t gThreadId = 0u; // Only for debug
70 } // unnamed namespace
74 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
76 mAsyncTaskManager(asyncTaskManager),
77 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
78 mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
79 mDestroyThread(false),
80 mIsThreadStarted(false),
85 AsyncTaskThread::~AsyncTaskThread()
89 ConditionalWait::ScopedLock lock(mConditionalWait);
90 mDestroyThread = true;
91 mConditionalWait.Notify(lock);
97 bool AsyncTaskThread::Request()
102 mIsThreadStarted = true;
106 // Lock while adding task to the queue
107 ConditionalWait::ScopedLock lock(mConditionalWait);
111 mIsThreadIdle = false;
113 // wake up the thread
114 mConditionalWait.Notify(lock);
122 void AsyncTaskThread::Run()
124 #if defined(DEBUG_ENABLED)
125 uint32_t threadId = gThreadId++;
128 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
132 SetThreadName("AsyncTaskThread");
134 mLogFactory.InstallLogFunction();
135 mTraceFactory.InstallTraceFunction();
137 while(!mDestroyThread)
139 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
142 ConditionalWait::ScopedLock lock(mConditionalWait);
145 mIsThreadIdle = true;
146 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
147 mConditionalWait.Wait(lock);
148 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
153 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
155 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
158 mAsyncTaskManager.CompleteTask(std::move(task));
164 // AsyncTaskManager::CacheImpl
166 struct AsyncTaskManager::CacheImpl
168 CacheImpl(AsyncTaskManager& manager)
174 // Insert / Erase task cache API.
177 * @brief Insert cache that input task.
178 * @pre Mutex be locked.
180 template<typename CacheContainer, typename Iterator>
181 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
183 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
184 cacheContainer.insert(cacheContainer.end(), iterator);
188 * @brief Erase cache that input task.
189 * @pre Mutex be locked.
191 template<typename CacheContainer, typename Iterator>
192 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
194 auto mapIter = cacheMap.find(task.Get());
195 if(mapIter != cacheMap.end())
197 auto& cacheContainer = (*mapIter).second;
198 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
200 if(cacheIter != cacheContainer.end())
202 cacheContainer.erase(cacheIter);
203 if(cacheContainer.empty())
205 cacheMap.erase(mapIter);
212 * @brief Erase all cache that input task.
213 * @pre Mutex be locked.
215 template<typename CacheContainer>
216 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
218 auto mapIter = cacheMap.find(task.Get());
219 if(mapIter != cacheMap.end())
221 cacheMap.erase(mapIter);
226 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
228 // Keep cache iterators as list since we take tasks by FIFO as default.
229 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
230 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
231 using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
233 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
234 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
235 CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
240 Dali::AsyncTaskManager AsyncTaskManager::Get()
242 Dali::AsyncTaskManager manager;
243 SingletonService singletonService(SingletonService::Get());
246 // Check whether the async task manager is already created
247 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
250 // If so, downcast the handle of singleton
251 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
256 // If not, create the async task manager and register it as a singleton
257 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
258 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
259 singletonService.Register(typeid(manager), manager);
265 AsyncTaskManager::AsyncTaskManager()
266 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
267 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
268 mWaitingHighProirityTaskCounts(0u),
269 mCacheImpl(new CacheImpl(*this)),
270 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
271 mProcessorRegistered(false)
275 AsyncTaskManager::~AsyncTaskManager()
277 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
279 mProcessorRegistered = false;
280 Dali::Adaptor::Get().UnregisterProcessor(*this);
286 // Remove cache impl after all threads are join.
289 // Remove tasks after CacheImpl removed
290 mWaitingTasks.clear();
291 mRunningTasks.clear();
292 mCompletedTasks.clear();
295 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
299 // Lock while adding task to the queue
300 Mutex::ScopedLock lock(mWaitingTasksMutex);
302 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
304 // push back into waiting queue.
305 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
306 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
308 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
310 // Increase the number of waiting tasks for high priority.
311 ++mWaitingHighProirityTaskCounts;
316 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
318 // Finish all Running threads are working
319 if(mRunningTasks.size() >= mTasks.GetElementCount())
326 size_t count = mTasks.GetElementCount();
328 while(index++ < count)
330 auto processHelperIt = mTasks.GetNext();
331 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
332 if(processHelperIt->Request())
336 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
339 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
340 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
342 Dali::Adaptor::Get().RegisterProcessor(*this);
343 mProcessorRegistered = true;
349 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
353 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
355 // Check whether we need to unregister processor.
356 // If there is some non-empty queue exist, we don't need to unregister processor.
357 bool needCheckUnregisterProcessor = true;
360 // Lock while remove task from the queue
361 Mutex::ScopedLock lock(mWaitingTasksMutex);
363 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
364 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
366 for(auto& iterator : mapIter->second)
368 DALI_ASSERT_DEBUG((*iterator) == task);
369 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
371 // Decrease the number of waiting tasks for high priority.
372 --mWaitingHighProirityTaskCounts;
374 mWaitingTasks.erase(iterator);
376 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
379 if(!mWaitingTasks.empty())
381 needCheckUnregisterProcessor = false;
386 // Lock while remove task from the queue
387 Mutex::ScopedLock lock(mRunningTasksMutex);
389 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
390 if(mapIter != mCacheImpl->mRunningTasksCache.end())
392 for(auto& iterator : mapIter->second)
394 DALI_ASSERT_DEBUG((*iterator).first == task);
395 // We cannot erase container. Just mark as canceled.
396 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
397 (*iterator).second = RunningTaskState::CANCELED;
401 if(!mRunningTasks.empty())
403 needCheckUnregisterProcessor = false;
408 // Lock while remove task from the queue
409 Mutex::ScopedLock lock(mCompletedTasksMutex);
411 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
412 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
414 for(auto& iterator : mapIter->second)
416 DALI_ASSERT_DEBUG(iterator->first == task);
417 mCompletedTasks.erase(iterator);
419 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
422 if(!mCompletedTasks.empty())
424 needCheckUnregisterProcessor = false;
428 // UnregisterProcessor required to lock mutex. Call this API only if required.
429 if(needCheckUnregisterProcessor)
431 UnregisterProcessor();
436 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
438 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
440 AsyncTaskPtr nextCompletedTask = nullptr;
442 // Lock while popping task out from the queue
443 Mutex::ScopedLock lock(mCompletedTasksMutex);
445 while(!mCompletedTasks.empty())
447 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
449 auto next = mCompletedTasks.begin();
450 AsyncTaskPtr nextTask = next->first;
451 CompletedTaskState taskState = next->second;
452 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
453 mCompletedTasks.erase(next);
455 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
457 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
459 nextCompletedTask = nextTask;
463 ignoredTaskList.push_back(nextTask);
466 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
469 return nextCompletedTask;
472 void AsyncTaskManager::UnregisterProcessor()
474 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
476 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
477 // Keep processor at least 1 task exist.
478 // Please be careful the order of mutex, to avoid dead lock.
479 // TODO : Should we lock all mutex rightnow?
480 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
481 if(mWaitingTasks.empty())
483 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
484 if(mRunningTasks.empty())
486 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
487 if(mCompletedTasks.empty())
489 mProcessorRegistered = false;
490 Dali::Adaptor::Get().UnregisterProcessor(*this);
494 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
498 void AsyncTaskManager::TasksCompleted()
500 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
501 while(AsyncTaskPtr task = PopNextCompletedTask())
503 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
504 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
507 UnregisterProcessor();
508 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
511 void AsyncTaskManager::Process(bool postProcessor)
516 /// Worker thread called
517 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
519 // Lock while popping task out from the queue
520 Mutex::ScopedLock lock(mWaitingTasksMutex);
522 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
524 // pop out the next task from the queue
525 AsyncTaskPtr nextTask = nullptr;
527 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
528 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
531 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
533 if(mAvaliableLowPriorityTaskCounts == 0u)
535 // There are no avaliabe tasks to run now. Return nullptr.
540 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
542 if((*iter)->IsReady())
544 const auto priorityType = (*iter)->GetPriorityType();
545 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
549 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
551 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
560 // Lock while popping task out from the queue
561 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
563 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
565 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
566 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
568 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
569 mWaitingTasks.erase(iter);
571 // Decrease avaliable task counts if it is low priority
572 if(priorityType == AsyncTask::PriorityType::LOW)
574 // We are under running task mutex. We can decrease it.
575 --mAvaliableLowPriorityTaskCounts;
579 if(priorityType == AsyncTask::PriorityType::HIGH)
581 // Decrease the number of waiting tasks for high priority.
582 --mWaitingHighProirityTaskCounts;
589 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
594 /// Worker thread called
595 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
601 bool needTrigger = false;
603 // Lock while check validation of task.
605 Mutex::ScopedLock lock(mRunningTasksMutex);
607 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
608 if(mapIter != mCacheImpl->mRunningTasksCache.end())
610 const auto cacheIter = mapIter->second.begin();
611 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
613 const auto iter = *cacheIter;
614 DALI_ASSERT_DEBUG(iter->first == task);
615 if(iter->second == RunningTaskState::RUNNING)
617 // This task is valid.
622 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
625 // We should execute this tasks complete callback out of mutex
626 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
628 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
629 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
632 // Lock while adding task to the queue
634 Mutex::ScopedLock lock(mRunningTasksMutex);
636 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
637 if(mapIter != mCacheImpl->mRunningTasksCache.end())
639 const auto cacheIter = mapIter->second.begin();
640 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
642 const auto iter = *cacheIter;
643 const auto priorityType = iter->first->GetPriorityType();
644 // Increase avaliable task counts if it is low priority
645 if(priorityType == AsyncTask::PriorityType::LOW)
647 // We are under running task mutex. We can increase it.
648 ++mAvaliableLowPriorityTaskCounts;
651 // Move task into completed, for ensure that AsyncTask destroy at main thread.
653 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
655 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
657 needTrigger |= callbackRequired;
659 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
661 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
662 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
664 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
665 mRunningTasks.erase(iter);
669 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
672 // Now, task is invalidate.
678 // Wake up the main thread
681 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
687 // AsyncTaskManager::TaskHelper
689 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
690 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
694 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
695 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
699 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
700 : mProcessor(std::move(processor)),
701 mAsyncTaskManager(asyncTaskManager)
705 bool AsyncTaskManager::TaskHelper::Request()
707 return mProcessor->Request();
709 } // namespace Adaptor
711 } // namespace Internal