[Tizen] Backport tizen_7.5 for Scene3D
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include "async-task-manager-impl.h"
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 namespace Dali
29 {
30 namespace Internal
31 {
32 namespace Adaptor
33 {
34 namespace
35 {
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
38
39 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
40 {
41   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
42   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
43   constexpr auto MAX_NUMBER_OF_THREADS = 10u;
44   DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
45   return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
46 }
47
48 #if defined(DEBUG_ENABLED)
49 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
50 #endif
51
52 } // unnamed namespace
53
54 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
55 : mConditionalWait(),
56   mAsyncTaskManager(asyncTaskManager),
57   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
58   mDestroyThread(false),
59   mIsThreadStarted(false),
60   mIsThreadIdle(true)
61 {
62 }
63
64 AsyncTaskThread::~AsyncTaskThread()
65 {
66   // Stop the thread
67   {
68     ConditionalWait::ScopedLock lock(mConditionalWait);
69     mDestroyThread = true;
70     mConditionalWait.Notify(lock);
71   }
72
73   Join();
74 }
75
76 bool AsyncTaskThread::Request()
77 {
78   if(!mIsThreadStarted)
79   {
80     Start();
81     mIsThreadStarted = true;
82   }
83
84   {
85     // Lock while adding task to the queue
86     ConditionalWait::ScopedLock lock(mConditionalWait);
87
88     if(mIsThreadIdle)
89     {
90       mIsThreadIdle = false;
91
92       // wake up the thread
93       mConditionalWait.Notify(lock);
94       return true;
95     }
96   }
97
98   return false;
99 }
100
101 void AsyncTaskThread::Run()
102 {
103   SetThreadName("AsyncTaskThread");
104   mLogFactory.InstallLogFunction();
105
106   while(!mDestroyThread)
107   {
108     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
109     if(!task)
110     {
111       ConditionalWait::ScopedLock lock(mConditionalWait);
112       if(!mDestroyThread)
113       {
114         mIsThreadIdle = true;
115         mConditionalWait.Wait(lock);
116       }
117     }
118     else
119     {
120       task->Process();
121       mAsyncTaskManager.CompleteTask(task);
122     }
123   }
124 }
125
126 Dali::AsyncTaskManager AsyncTaskManager::Get()
127 {
128   Dali::AsyncTaskManager manager;
129   SingletonService       singletonService(SingletonService::Get());
130   if(singletonService)
131   {
132     // Check whether the async task manager is already created
133     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
134     if(handle)
135     {
136       // If so, downcast the handle of singleton
137       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
138     }
139
140     if(!manager)
141     {
142       // If not, create the async task manager and register it as a singleton
143       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
144       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
145       singletonService.Register(typeid(manager), manager);
146     }
147   }
148   return manager;
149 }
150
151 AsyncTaskManager::AsyncTaskManager()
152 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
153   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
154   mProcessorRegistered(false)
155 {
156 }
157
158 AsyncTaskManager::~AsyncTaskManager()
159 {
160   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
161   {
162     Dali::Adaptor::Get().UnregisterProcessor(*this);
163   }
164
165   mTasks.Clear();
166 }
167
168 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
169 {
170   {
171     // Lock while adding task to the queue
172     Mutex::ScopedLock lock(mMutex);
173     mWaitingTasks.push_back(task);
174
175     // Finish all Running threads are working
176     if(mRunningTasks.size() >= mTasks.GetElementCount())
177     {
178       return;
179     }
180   }
181
182   size_t count = mTasks.GetElementCount();
183   size_t index = 0;
184   while(index++ < count)
185   {
186     auto processHelperIt = mTasks.GetNext();
187     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
188     if(processHelperIt->Request())
189     {
190       break;
191     }
192     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
193   }
194
195   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
196   {
197     Dali::Adaptor::Get().RegisterProcessor(*this);
198     mProcessorRegistered = true;
199   }
200
201   return;
202 }
203
204 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
205 {
206   {
207     // Lock while remove task from the queue
208     Mutex::ScopedLock lock(mMutex);
209     if(!mWaitingTasks.empty())
210     {
211       for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
212       {
213         if((*it) && (*it) == task)
214         {
215           it = mWaitingTasks.erase(it);
216         }
217         else
218         {
219           it++;
220         }
221       }
222     }
223
224     if(!mRunningTasks.empty())
225     {
226       for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
227       {
228         if((*iter).first == task)
229         {
230           (*iter).second = true;
231         }
232       }
233     }
234
235     if(!mCompletedTasks.empty())
236     {
237       for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
238       {
239         if((*it) && (*it) == task)
240         {
241           it = mCompletedTasks.erase(it);
242         }
243         else
244         {
245           it++;
246         }
247       }
248     }
249   }
250
251   UnregisterProcessor();
252 }
253
254 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
255 {
256   // Lock while popping task out from the queue
257   Mutex::ScopedLock lock(mMutex);
258
259   // pop out the next task from the queue
260   AsyncTaskPtr nextTask = nullptr;
261
262   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
263   {
264     if((*iter)->IsReady())
265     {
266       nextTask = *iter;
267
268       // Add Running queue
269       mRunningTasks.push_back(std::make_pair(nextTask, false));
270       mWaitingTasks.erase(iter);
271       break;
272     }
273   }
274
275   return nextTask;
276 }
277
278 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
279 {
280   // Lock while popping task out from the queue
281   Mutex::ScopedLock lock(mMutex);
282
283   if(mCompletedTasks.empty())
284   {
285     return AsyncTaskPtr();
286   }
287
288   std::vector<AsyncTaskPtr>::iterator next     = mCompletedTasks.begin();
289   AsyncTaskPtr                        nextTask = *next;
290   mCompletedTasks.erase(next);
291
292   return nextTask;
293 }
294
295 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
296 {
297   // Lock while adding task to the queue
298   {
299     Mutex::ScopedLock lock(mMutex);
300     for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
301     {
302       if((*iter).first == task)
303       {
304         if(!(*iter).second)
305         {
306           if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
307           {
308             mCompletedTasks.push_back(task);
309           }
310         }
311
312         // Delete this task in running queue
313         mRunningTasks.erase(iter);
314         break;
315       }
316     }
317   }
318
319   // wake up the main thread
320   if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
321   {
322     mTrigger->Trigger();
323   }
324   else
325   {
326     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
327   }
328 }
329
330 void AsyncTaskManager::UnregisterProcessor()
331 {
332   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
333   {
334     Mutex::ScopedLock lock(mMutex);
335     if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
336     {
337       Dali::Adaptor::Get().UnregisterProcessor(*this);
338       mProcessorRegistered = false;
339     }
340   }
341 }
342
343 void AsyncTaskManager::TasksCompleted()
344 {
345   while(AsyncTaskPtr task = PopNextCompletedTask())
346   {
347     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
348   }
349
350   UnregisterProcessor();
351 }
352
353 void AsyncTaskManager::Process(bool postProcessor)
354 {
355   TasksCompleted();
356 }
357
358 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
359 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
360 {
361 }
362
363 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
364 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
365 {
366 }
367
368 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
369 : mProcessor(std::move(processor)),
370   mAsyncTaskManager(asyncTaskManager)
371 {
372 }
373
374 bool AsyncTaskManager::TaskHelper::Request()
375 {
376   return mProcessor->Request();
377 }
378 } // namespace Adaptor
379
380 } // namespace Internal
381
382 } // namespace Dali