Revert "[Tizen] Add TasksCompleted signal at AsyncTaskManager"
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 #include <unordered_map>
29
30 namespace Dali
31 {
32 namespace Internal
33 {
34 namespace Adaptor
35 {
36 namespace
37 {
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
39
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
42
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
46
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
48 {
49   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
54 }
55
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
57 {
58   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
62 }
63
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
66
67 uint32_t gThreadId = 0u; // Only for debug
68 #endif
69
70 } // unnamed namespace
71
72 // AsyncTaskThread
73
74 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
75 : mConditionalWait(),
76   mAsyncTaskManager(asyncTaskManager),
77   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
78   mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
79   mDestroyThread(false),
80   mIsThreadStarted(false),
81   mIsThreadIdle(true)
82 {
83 }
84
85 AsyncTaskThread::~AsyncTaskThread()
86 {
87   // Stop the thread
88   {
89     ConditionalWait::ScopedLock lock(mConditionalWait);
90     mDestroyThread = true;
91     mConditionalWait.Notify(lock);
92   }
93
94   Join();
95 }
96
97 bool AsyncTaskThread::Request()
98 {
99   if(!mIsThreadStarted)
100   {
101     Start();
102     mIsThreadStarted = true;
103   }
104
105   {
106     // Lock while adding task to the queue
107     ConditionalWait::ScopedLock lock(mConditionalWait);
108
109     if(mIsThreadIdle)
110     {
111       mIsThreadIdle = false;
112
113       // wake up the thread
114       mConditionalWait.Notify(lock);
115       return true;
116     }
117   }
118
119   return false;
120 }
121
122 void AsyncTaskThread::Run()
123 {
124 #if defined(DEBUG_ENABLED)
125   uint32_t threadId = gThreadId++;
126   {
127     char temp[100];
128     snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
129     SetThreadName(temp);
130   }
131 #else
132   SetThreadName("AsyncTaskThread");
133 #endif
134   mLogFactory.InstallLogFunction();
135   mTraceFactory.InstallTraceFunction();
136
137   while(!mDestroyThread)
138   {
139     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
140     if(!task)
141     {
142       ConditionalWait::ScopedLock lock(mConditionalWait);
143       if(!mDestroyThread)
144       {
145         mIsThreadIdle = true;
146         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
147         mConditionalWait.Wait(lock);
148         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
149       }
150     }
151     else
152     {
153       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
154       task->Process();
155       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
156       if(!mDestroyThread)
157       {
158         mAsyncTaskManager.CompleteTask(std::move(task));
159       }
160     }
161   }
162 }
163
164 // AsyncTaskManager::CacheImpl
165
166 struct AsyncTaskManager::CacheImpl
167 {
168   CacheImpl(AsyncTaskManager& manager)
169   : mManager(manager)
170   {
171   }
172
173 public:
174   // Insert / Erase task cache API.
175
176   /**
177    * @brief Insert cache that input task.
178    * @pre Mutex be locked.
179    */
180   template<typename CacheContainer, typename Iterator>
181   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
182   {
183     auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
184     cacheContainer.insert(cacheContainer.end(), iterator);
185   }
186
187   /**
188    * @brief Erase cache that input task.
189    * @pre Mutex be locked.
190    */
191   template<typename CacheContainer, typename Iterator>
192   static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
193   {
194     auto mapIter = cacheMap.find(task.Get());
195     if(mapIter != cacheMap.end())
196     {
197       auto& cacheContainer = (*mapIter).second;
198       auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
199
200       if(cacheIter != cacheContainer.end())
201       {
202         cacheContainer.erase(cacheIter);
203         if(cacheContainer.empty())
204         {
205           cacheMap.erase(mapIter);
206         }
207       }
208     }
209   }
210
211   /**
212    * @brief Erase all cache that input task.
213    * @pre Mutex be locked.
214    */
215   template<typename CacheContainer>
216   static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
217   {
218     auto mapIter = cacheMap.find(task.Get());
219     if(mapIter != cacheMap.end())
220     {
221       cacheMap.erase(mapIter);
222     }
223   }
224
225 public:
226   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
227
228   // Keep cache iterators as list since we take tasks by FIFO as default.
229   using TaskCacheContainer          = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
230   using RunningTaskCacheContainer   = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
231   using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
232
233   TaskCacheContainer          mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
234   RunningTaskCacheContainer   mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
235   CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
236 };
237
238 // AsyncTaskManager
239
240 Dali::AsyncTaskManager AsyncTaskManager::Get()
241 {
242   Dali::AsyncTaskManager manager;
243   SingletonService       singletonService(SingletonService::Get());
244   if(singletonService)
245   {
246     // Check whether the async task manager is already created
247     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
248     if(handle)
249     {
250       // If so, downcast the handle of singleton
251       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
252     }
253
254     if(!manager)
255     {
256       // If not, create the async task manager and register it as a singleton
257       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
258       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
259       singletonService.Register(typeid(manager), manager);
260     }
261   }
262   return manager;
263 }
264
265 AsyncTaskManager::AsyncTaskManager()
266 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
267   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
268   mWaitingHighProirityTaskCounts(0u),
269   mCacheImpl(new CacheImpl(*this)),
270   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
271   mProcessorRegistered(false)
272 {
273 }
274
275 AsyncTaskManager::~AsyncTaskManager()
276 {
277   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
278   {
279     mProcessorRegistered = false;
280     Dali::Adaptor::Get().UnregisterProcessor(*this);
281   }
282
283   // Join all threads.
284   mTasks.Clear();
285
286   // Remove cache impl after all threads are join.
287   mCacheImpl.reset();
288
289   // Remove tasks after CacheImpl removed
290   mWaitingTasks.clear();
291   mRunningTasks.clear();
292   mCompletedTasks.clear();
293 }
294
295 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
296 {
297   if(task)
298   {
299     // Lock while adding task to the queue
300     Mutex::ScopedLock lock(mWaitingTasksMutex);
301
302     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
303
304     // push back into waiting queue.
305     auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
306     CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
307
308     if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
309     {
310       // Increase the number of waiting tasks for high priority.
311       ++mWaitingHighProirityTaskCounts;
312     }
313
314     {
315       // For thread safety
316       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
317
318       // Finish all Running threads are working
319       if(mRunningTasks.size() >= mTasks.GetElementCount())
320       {
321         return;
322       }
323     }
324   }
325
326   size_t count = mTasks.GetElementCount();
327   size_t index = 0;
328   while(index++ < count)
329   {
330     auto processHelperIt = mTasks.GetNext();
331     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
332     if(processHelperIt->Request())
333     {
334       break;
335     }
336     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
337   }
338
339   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
340   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
341   {
342     Dali::Adaptor::Get().RegisterProcessor(*this);
343     mProcessorRegistered = true;
344   }
345
346   return;
347 }
348
349 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
350 {
351   if(task)
352   {
353     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
354
355     // Check whether we need to unregister processor.
356     // If there is some non-empty queue exist, we don't need to unregister processor.
357     bool needCheckUnregisterProcessor = true;
358
359     {
360       // Lock while remove task from the queue
361       Mutex::ScopedLock lock(mWaitingTasksMutex);
362
363       auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
364       if(mapIter != mCacheImpl->mWaitingTasksCache.end())
365       {
366         for(auto& iterator : mapIter->second)
367         {
368           DALI_ASSERT_DEBUG((*iterator) == task);
369           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
370           {
371             // Decrease the number of waiting tasks for high priority.
372             --mWaitingHighProirityTaskCounts;
373           }
374           mWaitingTasks.erase(iterator);
375         }
376         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
377       }
378
379       if(!mWaitingTasks.empty())
380       {
381         needCheckUnregisterProcessor = false;
382       }
383     }
384
385     {
386       // Lock while remove task from the queue
387       Mutex::ScopedLock lock(mRunningTasksMutex);
388
389       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
390       if(mapIter != mCacheImpl->mRunningTasksCache.end())
391       {
392         for(auto& iterator : mapIter->second)
393         {
394           DALI_ASSERT_DEBUG((*iterator).first == task);
395           // We cannot erase container. Just mark as canceled.
396           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
397           (*iterator).second = RunningTaskState::CANCELED;
398         }
399       }
400
401       if(!mRunningTasks.empty())
402       {
403         needCheckUnregisterProcessor = false;
404       }
405     }
406
407     {
408       // Lock while remove task from the queue
409       Mutex::ScopedLock lock(mCompletedTasksMutex);
410
411       auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
412       if(mapIter != mCacheImpl->mCompletedTasksCache.end())
413       {
414         for(auto& iterator : mapIter->second)
415         {
416           DALI_ASSERT_DEBUG(iterator->first == task);
417           mCompletedTasks.erase(iterator);
418         }
419         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
420       }
421
422       if(!mCompletedTasks.empty())
423       {
424         needCheckUnregisterProcessor = false;
425       }
426     }
427
428     // UnregisterProcessor required to lock mutex. Call this API only if required.
429     if(needCheckUnregisterProcessor)
430     {
431       UnregisterProcessor();
432     }
433   }
434 }
435
436 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
437 {
438   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
439
440   AsyncTaskPtr nextCompletedTask = nullptr;
441   {
442     // Lock while popping task out from the queue
443     Mutex::ScopedLock lock(mCompletedTasksMutex);
444
445     while(!mCompletedTasks.empty())
446     {
447       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
448
449       auto               next      = mCompletedTasks.begin();
450       AsyncTaskPtr       nextTask  = next->first;
451       CompletedTaskState taskState = next->second;
452       CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
453       mCompletedTasks.erase(next);
454
455       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
456
457       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
458       {
459         nextCompletedTask = nextTask;
460         break;
461       }
462
463       ignoredTaskList.push_back(nextTask);
464     }
465
466     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
467   }
468
469   return nextCompletedTask;
470 }
471
472 void AsyncTaskManager::UnregisterProcessor()
473 {
474   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
475   {
476     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
477     // Keep processor at least 1 task exist.
478     // Please be careful the order of mutex, to avoid dead lock.
479     // TODO : Should we lock all mutex rightnow?
480     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
481     if(mWaitingTasks.empty())
482     {
483       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
484       if(mRunningTasks.empty())
485       {
486         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
487         if(mCompletedTasks.empty())
488         {
489           mProcessorRegistered = false;
490           Dali::Adaptor::Get().UnregisterProcessor(*this);
491         }
492       }
493     }
494     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
495   }
496 }
497
498 void AsyncTaskManager::TasksCompleted()
499 {
500   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
501   while(AsyncTaskPtr task = PopNextCompletedTask())
502   {
503     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
504     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
505   }
506
507   UnregisterProcessor();
508   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
509 }
510
511 void AsyncTaskManager::Process(bool postProcessor)
512 {
513   TasksCompleted();
514 }
515
516 /// Worker thread called
517 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
518 {
519   // Lock while popping task out from the queue
520   Mutex::ScopedLock lock(mWaitingTasksMutex);
521
522   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
523
524   // pop out the next task from the queue
525   AsyncTaskPtr nextTask = nullptr;
526
527   // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
528   if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
529   {
530     // For thread safety
531     Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
532
533     if(mAvaliableLowPriorityTaskCounts == 0u)
534     {
535       // There are no avaliabe tasks to run now. Return nullptr.
536       return nextTask;
537     }
538   }
539
540   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
541   {
542     if((*iter)->IsReady())
543     {
544       const auto priorityType  = (*iter)->GetPriorityType();
545       bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
546       if(!taskAvaliable)
547       {
548         // For thread safety
549         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
550
551         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
552       }
553
554       if(taskAvaliable)
555       {
556         nextTask = *iter;
557
558         // Add Running queue
559         {
560           // Lock while popping task out from the queue
561           Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
562
563           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
564
565           auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
566           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
567
568           CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
569           mWaitingTasks.erase(iter);
570
571           // Decrease avaliable task counts if it is low priority
572           if(priorityType == AsyncTask::PriorityType::LOW)
573           {
574             // We are under running task mutex. We can decrease it.
575             --mAvaliableLowPriorityTaskCounts;
576           }
577         }
578
579         if(priorityType == AsyncTask::PriorityType::HIGH)
580         {
581           // Decrease the number of waiting tasks for high priority.
582           --mWaitingHighProirityTaskCounts;
583         }
584         break;
585       }
586     }
587   }
588
589   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
590
591   return nextTask;
592 }
593
594 /// Worker thread called
595 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
596 {
597   bool notify = false;
598
599   if(task)
600   {
601     bool needTrigger = false;
602
603     // Lock while check validation of task.
604     {
605       Mutex::ScopedLock lock(mRunningTasksMutex);
606
607       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
608       if(mapIter != mCacheImpl->mRunningTasksCache.end())
609       {
610         const auto cacheIter = mapIter->second.begin();
611         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
612
613         const auto iter = *cacheIter;
614         DALI_ASSERT_DEBUG(iter->first == task);
615         if(iter->second == RunningTaskState::RUNNING)
616         {
617           // This task is valid.
618           notify = true;
619         }
620       }
621
622       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
623     }
624
625     // We should execute this tasks complete callback out of mutex
626     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
627     {
628       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
629       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
630     }
631
632     // Lock while adding task to the queue
633     {
634       Mutex::ScopedLock lock(mRunningTasksMutex);
635
636       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
637       if(mapIter != mCacheImpl->mRunningTasksCache.end())
638       {
639         const auto cacheIter = mapIter->second.begin();
640         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
641
642         const auto iter         = *cacheIter;
643         const auto priorityType = iter->first->GetPriorityType();
644         // Increase avaliable task counts if it is low priority
645         if(priorityType == AsyncTask::PriorityType::LOW)
646         {
647           // We are under running task mutex. We can increase it.
648           ++mAvaliableLowPriorityTaskCounts;
649         }
650
651         // Move task into completed, for ensure that AsyncTask destroy at main thread.
652         {
653           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
654
655           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
656
657           needTrigger |= callbackRequired;
658
659           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
660
661           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
662           CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
663
664           CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
665           mRunningTasks.erase(iter);
666
667           if(!needTrigger)
668           {
669             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
670           }
671
672           // Now, task is invalidate.
673           task.Reset();
674         }
675       }
676     }
677
678     // Wake up the main thread
679     if(needTrigger)
680     {
681       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
682       mTrigger->Trigger();
683     }
684   }
685 }
686
687 // AsyncTaskManager::TaskHelper
688
689 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
690 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
691 {
692 }
693
694 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
695 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
696 {
697 }
698
699 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
700 : mProcessor(std::move(processor)),
701   mAsyncTaskManager(asyncTaskManager)
702 {
703 }
704
705 bool AsyncTaskManager::TaskHelper::Request()
706 {
707   return mProcessor->Request();
708 }
709 } // namespace Adaptor
710
711 } // namespace Internal
712
713 } // namespace Dali