2 * Copyright (c) 2022 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "async-task-manager-impl.h"
22 #include <dali/devel-api/common/singleton-service.h>
23 #include <dali/devel-api/adaptor-framework/environment-variable.h>
24 #include <dali/devel-api/adaptor-framework/thread-settings.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
39 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
41 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
42 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
43 constexpr auto MAX_NUMBER_OF_THREADS = 10u;
44 DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
45 return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
48 #if defined(DEBUG_ENABLED)
49 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
52 } // unnamed namespace
54 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
56 mAsyncTaskManager(asyncTaskManager),
57 mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
58 mDestroyThread(false),
59 mIsThreadStarted(false),
64 AsyncTaskThread::~AsyncTaskThread()
68 ConditionalWait::ScopedLock lock(mConditionalWait);
69 mDestroyThread = true;
70 mConditionalWait.Notify(lock);
76 bool AsyncTaskThread::Request()
81 mIsThreadStarted = true;
85 // Lock while adding task to the queue
86 ConditionalWait::ScopedLock lock(mConditionalWait);
90 mIsThreadIdle = false;
93 mConditionalWait.Notify(lock);
101 void AsyncTaskThread::Run()
103 SetThreadName("AsyncTaskThread");
104 mLogFactory.InstallLogFunction();
106 while(!mDestroyThread)
108 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
111 ConditionalWait::ScopedLock lock(mConditionalWait);
112 mIsThreadIdle = true;
113 mConditionalWait.Wait(lock);
118 mAsyncTaskManager.CompleteTask(task);
123 Dali::AsyncTaskManager AsyncTaskManager::Get()
125 Dali::AsyncTaskManager manager;
126 SingletonService singletonService(SingletonService::Get());
129 // Check whether the async task manager is already created
130 Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
133 // If so, downcast the handle of singleton
134 manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
139 // If not, create the async task manager and register it as a singleton
140 Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
141 manager = Dali::AsyncTaskManager(internalAsyncTaskManager);
142 singletonService.Register(typeid(manager), manager);
148 AsyncTaskManager::AsyncTaskManager()
149 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
150 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
151 mProcessorRegistered(false)
155 AsyncTaskManager::~AsyncTaskManager()
157 if(mProcessorRegistered)
159 Dali::Adaptor::Get().UnregisterProcessor(*this);
165 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
168 // Lock while adding task to the queue
169 Mutex::ScopedLock lock(mMutex);
170 mWaitingTasks.push_back(task);
172 // Finish all Running threads are working
173 if(mRunningTasks.size() >= mTasks.GetElementCount())
179 size_t count = mTasks.GetElementCount();
181 while(index++ < count)
183 auto processHelperIt = mTasks.GetNext();
184 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
185 if(processHelperIt->Request())
189 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
192 if(!mProcessorRegistered)
194 Dali::Adaptor::Get().RegisterProcessor(*this);
195 mProcessorRegistered = true;
201 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
204 // Lock while remove task from the queue
205 Mutex::ScopedLock lock(mMutex);
206 if(!mWaitingTasks.empty())
208 for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
210 if((*it) && (*it) == task)
212 it = mWaitingTasks.erase(it);
221 if(!mRunningTasks.empty())
223 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
225 if((*iter).first == task)
227 (*iter).second = true;
232 if(!mCompletedTasks.empty())
234 for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
236 if((*it) && (*it) == task)
238 it = mCompletedTasks.erase(it);
248 UnregisterProcessor();
251 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
253 // Lock while popping task out from the queue
254 Mutex::ScopedLock lock(mMutex);
256 // pop out the next task from the queue
257 AsyncTaskPtr nextTask = nullptr;
259 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
261 if((*iter)->IsReady())
266 mRunningTasks.push_back(std::make_pair(nextTask, false));
267 mWaitingTasks.erase(iter);
275 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
277 // Lock while popping task out from the queue
278 Mutex::ScopedLock lock(mMutex);
280 if(mCompletedTasks.empty())
282 return AsyncTaskPtr();
285 std::vector<AsyncTaskPtr>::iterator next = mCompletedTasks.begin();
286 AsyncTaskPtr nextTask = *next;
287 mCompletedTasks.erase(next);
292 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
294 // Lock while adding task to the queue
295 Mutex::ScopedLock lock(mMutex);
296 for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
298 if((*iter).first == task)
302 mCompletedTasks.push_back(task);
305 // Delete this task in running queue
306 mRunningTasks.erase(iter);
311 // wake up the main thread
315 void AsyncTaskManager::UnregisterProcessor()
317 if(mProcessorRegistered)
319 Mutex::ScopedLock lock(mMutex);
320 if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
322 Dali::Adaptor::Get().UnregisterProcessor(*this);
323 mProcessorRegistered = false;
328 void AsyncTaskManager::TasksCompleted()
330 while(AsyncTaskPtr task = PopNextCompletedTask())
332 CallbackBase::Execute(*(task->GetCompletedCallback()),task);
335 UnregisterProcessor();
338 void AsyncTaskManager::Process(bool postProcessor)
343 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
344 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
348 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
349 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
353 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
354 : mProcessor(std::move(processor)),
355 mAsyncTaskManager(asyncTaskManager)
359 bool AsyncTaskManager::TaskHelper::Request()
361 return mProcessor->Request();
363 } // namespace Adaptor
365 } // namespace Internal