2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
28 #include <unordered_map>
38 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
39 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
41 // The number of threads for low priority task.
42 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
43 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
45 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
47 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
48 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
49 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
50 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
51 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
54 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
56 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
57 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
58 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
59 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
62 #if defined(DEBUG_ENABLED)
63 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
65 uint32_t gThreadId = 0u; // Only for debug
68 } // unnamed namespace
72 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
74 mAsyncTaskManager(asyncTaskManager),
75 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
76 mDestroyThread(false),
77 mIsThreadStarted(false),
82 AsyncTaskThread::~AsyncTaskThread()
86 ConditionalWait::ScopedLock lock(mConditionalWait);
87 mDestroyThread = true;
88 mConditionalWait.Notify(lock);
94 bool AsyncTaskThread::Request()
99 mIsThreadStarted = true;
103 // Lock while adding task to the queue
104 ConditionalWait::ScopedLock lock(mConditionalWait);
108 mIsThreadIdle = false;
110 // wake up the thread
111 mConditionalWait.Notify(lock);
119 void AsyncTaskThread::Run()
121 #if defined(DEBUG_ENABLED)
122 uint32_t threadId = gThreadId++;
125 snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
129 SetThreadName("AsyncTaskThread");
131 mLogFactory.InstallLogFunction();
133 while(!mDestroyThread)
135 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
138 ConditionalWait::ScopedLock lock(mConditionalWait);
141 mIsThreadIdle = true;
142 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
143 mConditionalWait.Wait(lock);
144 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
149 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
151 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
154 mAsyncTaskManager.CompleteTask(task);
160 // AsyncTaskManager::CacheImpl
162 struct AsyncTaskManager::CacheImpl
164 CacheImpl(AsyncTaskManager& manager)
170 // Insert / Erase task cache API.
173 * @brief Insert cache that input task.
174 * @pre Mutex be locked.
176 template<typename CacheContainer, typename Iterator>
177 static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
179 auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
180 cacheContainer.insert(cacheContainer.end(), iterator);
184 * @brief Erase cache that input task.
185 * @pre Mutex be locked.
187 template<typename CacheContainer, typename Iterator>
188 static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
190 auto mapIter = cacheMap.find(task.Get());
191 if(mapIter != cacheMap.end())
193 auto& cacheContainer = (*mapIter).second;
194 auto cacheIter = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
196 if(cacheIter != cacheContainer.end())
198 cacheContainer.erase(cacheIter);
199 if(cacheContainer.empty())
201 cacheMap.erase(mapIter);
208 * @brief Erase all cache that input task.
209 * @pre Mutex be locked.
211 template<typename CacheContainer>
212 static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
214 auto mapIter = cacheMap.find(task.Get());
215 if(mapIter != cacheMap.end())
217 cacheMap.erase(mapIter);
222 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
224 // Keep cache iterators as list since we take tasks by FIFO as default.
225 using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
226 using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
228 TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
229 RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
230 TaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
235 Dali::AsyncTaskManager AsyncTaskManager::Get()
237 Dali::AsyncTaskManager manager;
238 SingletonService singletonService(SingletonService::Get());
241 // Check whether the async task manager is already created
242 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
245 // If so, downcast the handle of singleton
246 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
251 // If not, create the async task manager and register it as a singleton
252 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
253 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
254 singletonService.Register(typeid(manager), manager);
260 AsyncTaskManager::AsyncTaskManager()
261 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
262 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
263 mWaitingHighProirityTaskCounts(0u),
264 mCacheImpl(new CacheImpl(*this)),
265 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
266 mProcessorRegistered(false)
270 AsyncTaskManager::~AsyncTaskManager()
272 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
274 mProcessorRegistered = false;
275 Dali::Adaptor::Get().UnregisterProcessor(*this);
281 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
285 // Lock while adding task to the queue
286 Mutex::ScopedLock lock(mWaitingTasksMutex);
288 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
290 // push back into waiting queue.
291 auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
292 CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
294 if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
296 // Increase the number of waiting tasks for high priority.
297 ++mWaitingHighProirityTaskCounts;
302 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
304 // Finish all Running threads are working
305 if(mRunningTasks.size() >= mTasks.GetElementCount())
312 size_t count = mTasks.GetElementCount();
314 while(index++ < count)
316 auto processHelperIt = mTasks.GetNext();
317 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
318 if(processHelperIt->Request())
322 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
325 // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
326 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
328 Dali::Adaptor::Get().RegisterProcessor(*this);
329 mProcessorRegistered = true;
335 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
339 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
341 // Check whether we need to unregister processor.
342 // If there is some non-empty queue exist, we don't need to unregister processor.
343 bool needCheckUnregisterProcessor = true;
346 // Lock while remove task from the queue
347 Mutex::ScopedLock lock(mWaitingTasksMutex);
349 auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
350 if(mapIter != mCacheImpl->mWaitingTasksCache.end())
352 for(auto& iterator : mapIter->second)
354 DALI_ASSERT_DEBUG((*iterator) == task);
355 if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
357 // Decrease the number of waiting tasks for high priority.
358 --mWaitingHighProirityTaskCounts;
360 mWaitingTasks.erase(iterator);
362 CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
365 if(!mWaitingTasks.empty())
367 needCheckUnregisterProcessor = false;
372 // Lock while remove task from the queue
373 Mutex::ScopedLock lock(mRunningTasksMutex);
375 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
376 if(mapIter != mCacheImpl->mRunningTasksCache.end())
378 for(auto& iterator : mapIter->second)
380 DALI_ASSERT_DEBUG((*iterator).first == task);
381 // We cannot erase container. Just mark as canceled.
382 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
383 (*iterator).second = RunningTaskState::CANCELED;
387 if(!mRunningTasks.empty())
389 needCheckUnregisterProcessor = false;
394 // Lock while remove task from the queue
395 Mutex::ScopedLock lock(mCompletedTasksMutex);
397 auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
398 if(mapIter != mCacheImpl->mCompletedTasksCache.end())
400 for(auto& iterator : mapIter->second)
402 DALI_ASSERT_DEBUG((*iterator) == task);
403 mCompletedTasks.erase(iterator);
405 CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
408 if(!mCompletedTasks.empty())
410 needCheckUnregisterProcessor = false;
414 // UnregisterProcessor required to lock mutex. Call this API only if required.
415 if(needCheckUnregisterProcessor)
417 UnregisterProcessor();
422 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
424 // Lock while popping task out from the queue
425 Mutex::ScopedLock lock(mCompletedTasksMutex);
427 if(mCompletedTasks.empty())
429 return AsyncTaskPtr();
432 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
434 auto next = mCompletedTasks.begin();
435 AsyncTaskPtr nextTask = *next;
436 CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
437 mCompletedTasks.erase(next);
439 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextTask.Get());
444 void AsyncTaskManager::UnregisterProcessor()
446 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
448 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
449 // Keep processor at least 1 task exist.
450 // Please be careful the order of mutex, to avoid dead lock.
451 // TODO : Should we lock all mutex rightnow?
452 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
453 if(mWaitingTasks.empty())
455 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
456 if(mRunningTasks.empty())
458 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
459 if(mCompletedTasks.empty())
461 mProcessorRegistered = false;
462 Dali::Adaptor::Get().UnregisterProcessor(*this);
466 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
470 void AsyncTaskManager::TasksCompleted()
472 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
473 while(AsyncTaskPtr task = PopNextCompletedTask())
475 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
476 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
479 UnregisterProcessor();
480 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
483 void AsyncTaskManager::Process(bool postProcessor)
488 /// Worker thread called
489 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
491 // Lock while popping task out from the queue
492 Mutex::ScopedLock lock(mWaitingTasksMutex);
494 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
496 // pop out the next task from the queue
497 AsyncTaskPtr nextTask = nullptr;
499 // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
500 if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
503 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
505 if(mAvaliableLowPriorityTaskCounts == 0u)
507 // There are no avaliabe tasks to run now. Return nullptr.
512 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
514 if((*iter)->IsReady())
516 const auto priorityType = (*iter)->GetPriorityType();
517 bool taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
521 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
523 taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
532 // Lock while popping task out from the queue
533 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
535 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
537 auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
538 CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
540 // Decrease avaliable task counts if it is low priority
541 if(priorityType == AsyncTask::PriorityType::LOW)
543 // We are under running task mutex. We can decrease it.
544 --mAvaliableLowPriorityTaskCounts;
548 if(priorityType == AsyncTask::PriorityType::HIGH)
550 // Decrease the number of waiting tasks for high priority.
551 --mWaitingHighProirityTaskCounts;
554 CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
555 mWaitingTasks.erase(iter);
561 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
566 /// Worker thread called
567 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
571 // Lock while adding task to the queue
574 Mutex::ScopedLock lock(mRunningTasksMutex);
576 auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
577 if(mapIter != mCacheImpl->mRunningTasksCache.end())
579 const auto cacheIter = mapIter->second.begin();
580 DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
582 const auto iter = *cacheIter;
583 DALI_ASSERT_DEBUG(iter->first == task);
584 if(iter->second == RunningTaskState::RUNNING)
586 // This task is valid.
590 const auto priorityType = iter->first->GetPriorityType();
591 // Increase avaliable task counts if it is low priority
592 if(priorityType == AsyncTask::PriorityType::LOW)
594 // We are under running task mutex. We can increase it.
595 ++mAvaliableLowPriorityTaskCounts;
597 CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
598 mRunningTasks.erase(iter);
601 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
603 // We should move the task to compeleted task under mRunningTaskMutex.
604 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
606 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
608 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
610 auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
611 CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
615 // We should execute this tasks complete callback out of mutex
618 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
620 // wake up the main thread
621 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
624 else // task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD
626 DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
627 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
632 // AsyncTaskManager::TaskHelper
634 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
635 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
639 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
640 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
644 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
645 : mProcessor(std::move(processor)),
646 mAsyncTaskManager(asyncTaskManager)
650 bool AsyncTaskManager::TaskHelper::Request()
652 return mProcessor->Request();
654 } // namespace Adaptor
656 } // namespace Internal