[Tizen] Revert "AsyncTaskManager overhead reduce"
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 namespace Dali
29 {
30 namespace Internal
31 {
32 namespace Adaptor
33 {
34 namespace
35 {
36 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
37 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
38
39 // The number of threads for low priority task.
40 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
41 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
42
43 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
44 {
45   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
46   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
47   constexpr auto MAX_NUMBER_OF_THREADS = 10u;
48   DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
49   return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
50 }
51
52 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
53 {
54   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
55   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
56   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
57   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
58 }
59
60 #if defined(DEBUG_ENABLED)
61 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
62 #endif
63
64 } // unnamed namespace
65
66 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
67 : mConditionalWait(),
68   mAsyncTaskManager(asyncTaskManager),
69   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
70   mDestroyThread(false),
71   mIsThreadStarted(false),
72   mIsThreadIdle(true)
73 {
74 }
75
76 AsyncTaskThread::~AsyncTaskThread()
77 {
78   // Stop the thread
79   {
80     ConditionalWait::ScopedLock lock(mConditionalWait);
81     mDestroyThread = true;
82     mConditionalWait.Notify(lock);
83   }
84
85   Join();
86 }
87
88 bool AsyncTaskThread::Request()
89 {
90   if(!mIsThreadStarted)
91   {
92     Start();
93     mIsThreadStarted = true;
94   }
95
96   {
97     // Lock while adding task to the queue
98     ConditionalWait::ScopedLock lock(mConditionalWait);
99
100     if(mIsThreadIdle)
101     {
102       mIsThreadIdle = false;
103
104       // wake up the thread
105       mConditionalWait.Notify(lock);
106       return true;
107     }
108   }
109
110   return false;
111 }
112
113 void AsyncTaskThread::Run()
114 {
115   SetThreadName("AsyncTaskThread");
116   mLogFactory.InstallLogFunction();
117
118   while(!mDestroyThread)
119   {
120     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
121     if(!task)
122     {
123       ConditionalWait::ScopedLock lock(mConditionalWait);
124       if(!mDestroyThread)
125       {
126         mIsThreadIdle = true;
127         mConditionalWait.Wait(lock);
128       }
129     }
130     else
131     {
132       task->Process();
133       mAsyncTaskManager.CompleteTask(task);
134     }
135   }
136 }
137
138 Dali::AsyncTaskManager AsyncTaskManager::Get()
139 {
140   Dali::AsyncTaskManager manager;
141   SingletonService       singletonService(SingletonService::Get());
142   if(singletonService)
143   {
144     // Check whether the async task manager is already created
145     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
146     if(handle)
147     {
148       // If so, downcast the handle of singleton
149       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
150     }
151
152     if(!manager)
153     {
154       // If not, create the async task manager and register it as a singleton
155       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
156       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
157       singletonService.Register(typeid(manager), manager);
158     }
159   }
160   return manager;
161 }
162
163 AsyncTaskManager::AsyncTaskManager()
164 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
165   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
166   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
167   mProcessorRegistered(false)
168 {
169 }
170
171 AsyncTaskManager::~AsyncTaskManager()
172 {
173   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
174   {
175     Dali::Adaptor::Get().UnregisterProcessor(*this);
176   }
177
178   mTasks.Clear();
179 }
180
181 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
182 {
183   if(task)
184   {
185     // Lock while adding task to the queue
186     Mutex::ScopedLock lock(mMutex);
187
188     mWaitingTasks.push_back(task);
189
190     // Finish all Running threads are working
191     if(mRunningTasks.size() >= mTasks.GetElementCount())
192     {
193       return;
194     }
195   }
196
197   size_t count = mTasks.GetElementCount();
198   size_t index = 0;
199   while(index++ < count)
200   {
201     auto processHelperIt = mTasks.GetNext();
202     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
203     if(processHelperIt->Request())
204     {
205       break;
206     }
207     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
208   }
209
210   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
211   {
212     Dali::Adaptor::Get().RegisterProcessor(*this);
213     mProcessorRegistered = true;
214   }
215
216   return;
217 }
218
219 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
220 {
221   if(task)
222   {
223     // Lock while remove task from the queue
224     Mutex::ScopedLock lock(mMutex);
225
226     if(!mWaitingTasks.empty())
227     {
228       for(std::vector<AsyncTaskPtr>::iterator it = mWaitingTasks.begin(); it != mWaitingTasks.end();)
229       {
230         if((*it) && (*it) == task)
231         {
232           it = mWaitingTasks.erase(it);
233         }
234         else
235         {
236           it++;
237         }
238       }
239     }
240
241     if(!mRunningTasks.empty())
242     {
243       for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
244       {
245         if((*iter).first == task)
246         {
247           (*iter).second = true;
248         }
249       }
250     }
251
252     if(!mCompletedTasks.empty())
253     {
254       for(std::vector<AsyncTaskPtr>::iterator it = mCompletedTasks.begin(); it != mCompletedTasks.end();)
255       {
256         if((*it) && (*it) == task)
257         {
258           it = mCompletedTasks.erase(it);
259         }
260         else
261         {
262           it++;
263         }
264       }
265     }
266   }
267
268   UnregisterProcessor();
269 }
270
271 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
272 {
273   // Lock while popping task out from the queue
274   Mutex::ScopedLock lock(mMutex);
275
276   // pop out the next task from the queue
277   AsyncTaskPtr nextTask = nullptr;
278
279   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
280   {
281     if((*iter)->IsReady())
282     {
283       const auto priorityType  = (*iter)->GetPriorityType();
284       const bool taskAvaliable = (priorityType == AsyncTask::PriorityType::HIGH) || // Task always valid if it's priority is high
285                                  (mAvaliableLowPriorityTaskCounts > 0u);            // or priority is low, but we can use it.
286
287       if(taskAvaliable)
288       {
289         nextTask = *iter;
290
291         // Add Running queue
292         mRunningTasks.push_back(std::make_pair(nextTask, false));
293         mWaitingTasks.erase(iter);
294
295         // Decrease avaliable task counts if it is low priority
296         if(priorityType == AsyncTask::PriorityType::LOW)
297         {
298           --mAvaliableLowPriorityTaskCounts;
299         }
300         break;
301       }
302     }
303   }
304
305   return nextTask;
306 }
307
308 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
309 {
310   // Lock while popping task out from the queue
311   Mutex::ScopedLock lock(mMutex);
312
313   if(mCompletedTasks.empty())
314   {
315     return AsyncTaskPtr();
316   }
317
318   std::vector<AsyncTaskPtr>::iterator next     = mCompletedTasks.begin();
319   AsyncTaskPtr                        nextTask = *next;
320   mCompletedTasks.erase(next);
321
322   return nextTask;
323 }
324
325 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
326 {
327   // Lock while adding task to the queue
328   {
329     Mutex::ScopedLock lock(mMutex);
330
331     for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
332     {
333       if((*iter).first == task)
334       {
335         if(!(*iter).second)
336         {
337           if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
338           {
339             mCompletedTasks.push_back(task);
340           }
341         }
342
343         // Delete this task in running queue
344         mRunningTasks.erase(iter);
345
346         // Increase avaliable task counts if it is low priority
347         const auto priorityType = task->GetPriorityType();
348         if(priorityType == AsyncTask::PriorityType::LOW)
349         {
350           ++mAvaliableLowPriorityTaskCounts;
351         }
352         break;
353       }
354     }
355   }
356
357   // wake up the main thread
358   if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
359   {
360     mTrigger->Trigger();
361   }
362   else
363   {
364     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
365   }
366 }
367
368 void AsyncTaskManager::UnregisterProcessor()
369 {
370   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
371   {
372     Mutex::ScopedLock lock(mMutex);
373     if(mWaitingTasks.empty() && mCompletedTasks.empty() && mRunningTasks.empty())
374     {
375       Dali::Adaptor::Get().UnregisterProcessor(*this);
376       mProcessorRegistered = false;
377     }
378   }
379 }
380
381 void AsyncTaskManager::TasksCompleted()
382 {
383   while(AsyncTaskPtr task = PopNextCompletedTask())
384   {
385     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
386   }
387
388   UnregisterProcessor();
389 }
390
391 void AsyncTaskManager::Process(bool postProcessor)
392 {
393   TasksCompleted();
394 }
395
396 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
397 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
398 {
399 }
400
401 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
402 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
403 {
404 }
405
406 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
407 : mProcessor(std::move(processor)),
408   mAsyncTaskManager(asyncTaskManager)
409 {
410 }
411
412 bool AsyncTaskManager::TaskHelper::Request()
413 {
414   return mProcessor->Request();
415 }
416 } // namespace Adaptor
417
418 } // namespace Internal
419
420 } // namespace Dali