2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "async-task-manager-impl.h"
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
39 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
41 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
42 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
43 constexpr auto MAX_NUMBER_OF_THREADS = 10u;
44 DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
45 return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
48 #if defined(DEBUG_ENABLED)
49 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
52 } // unnamed namespace
54 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
56 mAsyncTaskManager(asyncTaskManager),
57 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
58 mDestroyThread(false),
59 mIsThreadStarted(false),
64 AsyncTaskThread::~AsyncTaskThread()
68 ConditionalWait::ScopedLock lock(mConditionalWait);
69 mDestroyThread = true;
70 mConditionalWait.Notify(lock);
76 bool AsyncTaskThread::Request()
81 mIsThreadStarted = true;
85 // Lock while adding task to the queue
86 ConditionalWait::ScopedLock lock(mConditionalWait);
90 mIsThreadIdle = false;
93 mConditionalWait.Notify(lock);
101 void AsyncTaskThread::Run()
103 SetThreadName("AsyncTaskThread");
104 mLogFactory.InstallLogFunction();
106 while(!mDestroyThread)
108 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
111 ConditionalWait::ScopedLock lock(mConditionalWait);
114 mIsThreadIdle = true;
115 mConditionalWait.Wait(lock);
121 mAsyncTaskManager.CompleteTask(task);
126 Dali::AsyncTaskManager AsyncTaskManager::Get()
128 Dali::AsyncTaskManager manager;
129 SingletonService singletonService(SingletonService::Get());
132 // Check whether the async task manager is already created
133 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
136 // If so, downcast the handle of singleton
137 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
142 // If not, create the async task manager and register it as a singleton
143 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
144 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
145 singletonService.Register(typeid(manager), manager);
151 AsyncTaskManager::AsyncTaskManager()
152 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
153 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
154 mProcessorRegistered(false)
158 AsyncTaskManager::~AsyncTaskManager()
160 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
162 Dali::Adaptor::Get().UnregisterProcessor(*this);
168 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
171 // Lock while adding task to the queue
172 Mutex::ScopedLock lock(mMutex);
173 mWaitingTasks.push_back(task);
175 // Finish all Running threads are working
176 if(mRunningTasks.size() >= mTasks.GetElementCount())
182 size_t count = mTasks.GetElementCount();
184 while(index++ < count)
186 auto processHelperIt = mTasks.GetNext();
187 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
188 if(processHelperIt->Request())
192 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
195 if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
197 Dali::Adaptor::Get().RegisterProcessor(*this);
198 mProcessorRegistered = true;
204 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
207 // Lock while remove task from the queue
208 Mutex::ScopedLock lock(mMutex);
209 if(!mWaitingTasks.empty())
211 for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
213 if((*it) && (*it) == task)
215 it = mWaitingTasks.erase(it);
224 if(!mRunningTasks.empty())
226 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
228 if((*iter).first == task)
230 (*iter).second = true;
235 if(!mCompletedTasks.empty())
237 for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
239 if((*it) && (*it) == task)
241 it = mCompletedTasks.erase(it);
251 UnregisterProcessor();
254 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
256 // Lock while popping task out from the queue
257 Mutex::ScopedLock lock(mMutex);
259 // pop out the next task from the queue
260 AsyncTaskPtr nextTask = nullptr;
262 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
264 if((*iter)->IsReady())
269 mRunningTasks.push_back(std::make_pair(nextTask, false));
270 mWaitingTasks.erase(iter);
278 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
280 // Lock while popping task out from the queue
281 Mutex::ScopedLock lock(mMutex);
283 if(mCompletedTasks.empty())
285 return AsyncTaskPtr();
288 std::vector<AsyncTaskPtr>::iterator next = mCompletedTasks.begin();
289 AsyncTaskPtr nextTask = *next;
290 mCompletedTasks.erase(next);
295 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
297 // Lock while adding task to the queue
299 Mutex::ScopedLock lock(mMutex);
300 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
302 if((*iter).first == task)
306 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
308 mCompletedTasks.push_back(task);
312 // Delete this task in running queue
313 mRunningTasks.erase(iter);
319 // wake up the main thread
320 if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
326 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
330 void AsyncTaskManager::UnregisterProcessor()
332 if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
334 Mutex::ScopedLock lock(mMutex);
335 if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
337 Dali::Adaptor::Get().UnregisterProcessor(*this);
338 mProcessorRegistered = false;
343 void AsyncTaskManager::TasksCompleted()
345 while(AsyncTaskPtr task = PopNextCompletedTask())
347 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
350 UnregisterProcessor();
353 void AsyncTaskManager::Process(bool postProcessor)
358 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
359 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
363 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
364 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
368 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
369 : mProcessor(std::move(processor)),
370 mAsyncTaskManager(asyncTaskManager)
374 bool AsyncTaskManager::TaskHelper::Request()
376 return mProcessor->Request();
378 } // namespace Adaptor
380 } // namespace Internal