2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <dali/internal/system/common/async-task-manager-impl.h>
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
39 // The number of threads for low priority task.
40 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
41 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
43 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
45 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
46 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
47 constexpr auto MAX_NUMBER_OF_THREADS = 10u;
48 DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
49 return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
52 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
54 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
55 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
56 DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
57 return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
60 #if defined(DEBUG_ENABLED)
61 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
64 } // unnamed namespace
66 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
68 mAsyncTaskManager(asyncTaskManager),
69 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
70 mDestroyThread(false),
71 mIsThreadStarted(false),
76 AsyncTaskThread::~AsyncTaskThread()
80 ConditionalWait::ScopedLock lock(mConditionalWait);
81 mDestroyThread = true;
82 mConditionalWait.Notify(lock);
88 bool AsyncTaskThread::Request()
93 mIsThreadStarted = true;
97 // Lock while adding task to the queue
98 ConditionalWait::ScopedLock lock(mConditionalWait);
102 mIsThreadIdle = false;
104 // wake up the thread
105 mConditionalWait.Notify(lock);
113 void AsyncTaskThread::Run()
115 SetThreadName("AsyncTaskThread");
116 mLogFactory.InstallLogFunction();
118 while(!mDestroyThread)
120 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
123 ConditionalWait::ScopedLock lock(mConditionalWait);
126 mIsThreadIdle = true;
127 mConditionalWait.Wait(lock);
133 mAsyncTaskManager.CompleteTask(task);
138 Dali::AsyncTaskManager AsyncTaskManager::Get()
140 Dali::AsyncTaskManager manager;
141 SingletonService singletonService(SingletonService::Get());
144 // Check whether the async task manager is already created
145 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
148 // If so, downcast the handle of singleton
149 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
154 // If not, create the async task manager and register it as a singleton
155 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
156 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
157 singletonService.Register(typeid(manager), manager);
163 AsyncTaskManager::AsyncTaskManager()
164 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
165 mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
166 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
167 mProcessorRegistered(false)
171 AsyncTaskManager::~AsyncTaskManager()
173 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
175 Dali::Adaptor::Get().UnregisterProcessor(*this);
181 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
185 // Lock while adding task to the queue
186 Mutex::ScopedLock lock(mMutex);
188 mWaitingTasks.push_back(task);
190 // Finish all Running threads are working
191 if(mRunningTasks.size() >= mTasks.GetElementCount())
197 size_t count = mTasks.GetElementCount();
199 while(index++ < count)
201 auto processHelperIt = mTasks.GetNext();
202 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
203 if(processHelperIt->Request())
207 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
210 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
212 Dali::Adaptor::Get().RegisterProcessor(*this);
213 mProcessorRegistered = true;
219 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
223 // Lock while remove task from the queue
224 Mutex::ScopedLock lock(mMutex);
226 if(!mWaitingTasks.empty())
228 for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
230 if((*it) && (*it) == task)
232 it = mWaitingTasks.erase(it);
241 if(!mRunningTasks.empty())
243 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
245 if((*iter).first == task)
247 (*iter).second = true;
252 if(!mCompletedTasks.empty())
254 for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
256 if((*it) && (*it) == task)
258 it = mCompletedTasks.erase(it);
268 UnregisterProcessor();
271 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
273 // Lock while popping task out from the queue
274 Mutex::ScopedLock lock(mMutex);
276 // pop out the next task from the queue
277 AsyncTaskPtr nextTask = nullptr;
279 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
281 if((*iter)->IsReady())
283 const auto priorityType = (*iter)->GetPriorityType();
284 const bool taskAvaliable = (priorityType == AsyncTask::PriorityType::HIGH) || // Task always valid if it's priority is high
285 (mAvaliableLowPriorityTaskCounts > 0u); // or priority is low, but we can use it.
292 mRunningTasks.push_back(std::make_pair(nextTask, false));
293 mWaitingTasks.erase(iter);
295 // Decrease avaliable task counts if it is low priority
296 if(priorityType == AsyncTask::PriorityType::LOW)
298 --mAvaliableLowPriorityTaskCounts;
308 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
310 // Lock while popping task out from the queue
311 Mutex::ScopedLock lock(mMutex);
313 if(mCompletedTasks.empty())
315 return AsyncTaskPtr();
318 std::vector<AsyncTaskPtr>::iterator next = mCompletedTasks.begin();
319 AsyncTaskPtr nextTask = *next;
320 mCompletedTasks.erase(next);
325 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
327 // Lock while adding task to the queue
329 Mutex::ScopedLock lock(mMutex);
331 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
333 if((*iter).first == task)
337 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
339 mCompletedTasks.push_back(task);
343 // Delete this task in running queue
344 mRunningTasks.erase(iter);
346 // Increase avaliable task counts if it is low priority
347 const auto priorityType = task->GetPriorityType();
348 if(priorityType == AsyncTask::PriorityType::LOW)
350 ++mAvaliableLowPriorityTaskCounts;
357 // wake up the main thread
358 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
364 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
368 void AsyncTaskManager::UnregisterProcessor()
370 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
372 Mutex::ScopedLock lock(mMutex);
373 if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
375 Dali::Adaptor::Get().UnregisterProcessor(*this);
376 mProcessorRegistered = false;
381 void AsyncTaskManager::TasksCompleted()
383 while(AsyncTaskPtr task = PopNextCompletedTask())
385 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
388 UnregisterProcessor();
391 void AsyncTaskManager::Process(bool postProcessor)
396 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
397 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
401 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
402 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
406 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
407 : mProcessor(std::move(processor)),
408 mAsyncTaskManager(asyncTaskManager)
412 bool AsyncTaskManager::TaskHelper::Request()
414 return mProcessor->Request();
416 } // namespace Adaptor
418 } // namespace Internal