Add GetCallbackInvocationThread() for AsyncTaskMananager
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include "async-task-manager-impl.h"
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 namespace Dali
29 {
30 namespace Internal
31 {
32 namespace Adaptor
33 {
34 namespace
35 {
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
38
39 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
40 {
41   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
42   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
43   constexpr auto MAX_NUMBER_OF_THREADS = 10u;
44   DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
45   return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
46 }
47
48 #if defined(DEBUG_ENABLED)
49 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
50 #endif
51
52 } // unnamed namespace
53
54 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
55 : mConditionalWait(),
56   mAsyncTaskManager(asyncTaskManager),
57   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
58   mDestroyThread(false),
59   mIsThreadStarted(false),
60   mIsThreadIdle(true)
61 {
62 }
63
64 AsyncTaskThread::~AsyncTaskThread()
65 {
66   // Stop the thread
67   {
68     ConditionalWait::ScopedLock lock(mConditionalWait);
69     mDestroyThread = true;
70     mConditionalWait.Notify(lock);
71   }
72
73   Join();
74 }
75
76 bool AsyncTaskThread::Request()
77 {
78   if(!mIsThreadStarted)
79   {
80     Start();
81     mIsThreadStarted = true;
82   }
83
84   {
85     // Lock while adding task to the queue
86     ConditionalWait::ScopedLock lock(mConditionalWait);
87
88     if(mIsThreadIdle)
89     {
90       mIsThreadIdle = false;
91
92       // wake up the thread
93       mConditionalWait.Notify(lock);
94       return true;
95     }
96   }
97
98   return false;
99 }
100
101 void AsyncTaskThread::Run()
102 {
103   SetThreadName("AsyncTaskThread");
104   mLogFactory.InstallLogFunction();
105
106   while(!mDestroyThread)
107   {
108     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
109     if(!task)
110     {
111       ConditionalWait::ScopedLock lock(mConditionalWait);
112       mIsThreadIdle = true;
113       mConditionalWait.Wait(lock);
114     }
115     else
116     {
117       task->Process();
118       mAsyncTaskManager.CompleteTask(task);
119     }
120   }
121 }
122
123 Dali::AsyncTaskManager AsyncTaskManager::Get()
124 {
125   Dali::AsyncTaskManager manager;
126   SingletonService       singletonService(SingletonService::Get());
127   if(singletonService)
128   {
129     // Check whether the async task manager is already created
130     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
131     if(handle)
132     {
133       // If so, downcast the handle of singleton
134       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
135     }
136
137     if(!manager)
138     {
139       // If not, create the async task manager and register it as a singleton
140       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
141       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
142       singletonService.Register(typeid(manager), manager);
143     }
144   }
145   return manager;
146 }
147
148 AsyncTaskManager::AsyncTaskManager()
149 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
150   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
151   mProcessorRegistered(false)
152 {
153 }
154
155 AsyncTaskManager::~AsyncTaskManager()
156 {
157   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
158   {
159     Dali::Adaptor::Get().UnregisterProcessor(*this);
160   }
161
162   mTasks.Clear();
163 }
164
165 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
166 {
167   {
168     // Lock while adding task to the queue
169     Mutex::ScopedLock lock(mMutex);
170     mWaitingTasks.push_back(task);
171
172     // Finish all Running threads are working
173     if(mRunningTasks.size() >= mTasks.GetElementCount())
174     {
175       return;
176     }
177   }
178
179   size_t count = mTasks.GetElementCount();
180   size_t index = 0;
181   while(index++ < count)
182   {
183     auto processHelperIt = mTasks.GetNext();
184     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
185     if(processHelperIt->Request())
186     {
187       break;
188     }
189     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
190   }
191
192   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
193   {
194     Dali::Adaptor::Get().RegisterProcessor(*this);
195     mProcessorRegistered = true;
196   }
197
198   return;
199 }
200
201 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
202 {
203   {
204     // Lock while remove task from the queue
205     Mutex::ScopedLock lock(mMutex);
206     if(!mWaitingTasks.empty())
207     {
208       for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
209       {
210         if((*it) && (*it) == task)
211         {
212           it = mWaitingTasks.erase(it);
213         }
214         else
215         {
216           it++;
217         }
218       }
219     }
220
221     if(!mRunningTasks.empty())
222     {
223       for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
224       {
225         if((*iter).first == task)
226         {
227           (*iter).second = true;
228         }
229       }
230     }
231
232     if(!mCompletedTasks.empty())
233     {
234       for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
235       {
236         if((*it) && (*it) == task)
237         {
238           it = mCompletedTasks.erase(it);
239         }
240         else
241         {
242           it++;
243         }
244       }
245     }
246   }
247
248   UnregisterProcessor();
249 }
250
251 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
252 {
253   // Lock while popping task out from the queue
254   Mutex::ScopedLock lock(mMutex);
255
256   // pop out the next task from the queue
257   AsyncTaskPtr nextTask = nullptr;
258
259   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
260   {
261     if((*iter)->IsReady())
262     {
263       nextTask = *iter;
264
265       // Add Running queue
266       mRunningTasks.push_back(std::make_pair(nextTask, false));
267       mWaitingTasks.erase(iter);
268       break;
269     }
270   }
271
272   return nextTask;
273 }
274
275 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
276 {
277   // Lock while popping task out from the queue
278   Mutex::ScopedLock lock(mMutex);
279
280   if(mCompletedTasks.empty())
281   {
282     return AsyncTaskPtr();
283   }
284
285   std::vector<AsyncTaskPtr>::iterator next     = mCompletedTasks.begin();
286   AsyncTaskPtr                        nextTask = *next;
287   mCompletedTasks.erase(next);
288
289   return nextTask;
290 }
291
292 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
293 {
294   // Lock while adding task to the queue
295   {
296     Mutex::ScopedLock lock(mMutex);
297     for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
298     {
299       if((*iter).first == task)
300       {
301         if(!(*iter).second)
302         {
303           if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
304           {
305             mCompletedTasks.push_back(task);
306           }
307         }
308
309         // Delete this task in running queue
310         mRunningTasks.erase(iter);
311         break;
312       }
313     }
314   }
315
316   // wake up the main thread
317   if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
318   {
319     mTrigger->Trigger();
320   }
321   else
322   {
323     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
324   }
325 }
326
327 void AsyncTaskManager::UnregisterProcessor()
328 {
329   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
330   {
331     Mutex::ScopedLock lock(mMutex);
332     if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
333     {
334       Dali::Adaptor::Get().UnregisterProcessor(*this);
335       mProcessorRegistered = false;
336     }
337   }
338 }
339
340 void AsyncTaskManager::TasksCompleted()
341 {
342   while(AsyncTaskPtr task = PopNextCompletedTask())
343   {
344     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
345   }
346
347   UnregisterProcessor();
348 }
349
350 void AsyncTaskManager::Process(bool postProcessor)
351 {
352   TasksCompleted();
353 }
354
355 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
356 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
357 {
358 }
359
360 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
361 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
362 {
363 }
364
365 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
366 : mProcessor(std::move(processor)),
367   mAsyncTaskManager(asyncTaskManager)
368 {
369 }
370
371 bool AsyncTaskManager::TaskHelper::Request()
372 {
373   return mProcessor->Request();
374 }
375 } // namespace Adaptor
376
377 } // namespace Internal
378
379 } // namespace Dali