Remove AsynTaskManager CacheImpl before all tasks removed
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 #include <unordered_map>
29
30 namespace Dali
31 {
32 namespace Internal
33 {
34 namespace Adaptor
35 {
36 namespace
37 {
38 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
39 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
40
41 // The number of threads for low priority task.
42 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
43 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
44
45 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
46 {
47   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
48   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
49   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
50   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
51   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
52 }
53
54 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
55 {
56   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
57   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
58   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
59   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
60 }
61
62 #if defined(DEBUG_ENABLED)
63 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
64
65 uint32_t gThreadId = 0u; // Only for debug
66 #endif
67
68 } // unnamed namespace
69
70 // AsyncTaskThread
71
72 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
73 : mConditionalWait(),
74   mAsyncTaskManager(asyncTaskManager),
75   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
76   mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
77   mDestroyThread(false),
78   mIsThreadStarted(false),
79   mIsThreadIdle(true)
80 {
81 }
82
83 AsyncTaskThread::~AsyncTaskThread()
84 {
85   // Stop the thread
86   {
87     ConditionalWait::ScopedLock lock(mConditionalWait);
88     mDestroyThread = true;
89     mConditionalWait.Notify(lock);
90   }
91
92   Join();
93 }
94
95 bool AsyncTaskThread::Request()
96 {
97   if(!mIsThreadStarted)
98   {
99     Start();
100     mIsThreadStarted = true;
101   }
102
103   {
104     // Lock while adding task to the queue
105     ConditionalWait::ScopedLock lock(mConditionalWait);
106
107     if(mIsThreadIdle)
108     {
109       mIsThreadIdle = false;
110
111       // wake up the thread
112       mConditionalWait.Notify(lock);
113       return true;
114     }
115   }
116
117   return false;
118 }
119
120 void AsyncTaskThread::Run()
121 {
122 #if defined(DEBUG_ENABLED)
123   uint32_t threadId = gThreadId++;
124   {
125     char temp[100];
126     snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
127     SetThreadName(temp);
128   }
129 #else
130   SetThreadName("AsyncTaskThread");
131 #endif
132   mLogFactory.InstallLogFunction();
133   mTraceFactory.InstallTraceFunction();
134
135   while(!mDestroyThread)
136   {
137     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
138     if(!task)
139     {
140       ConditionalWait::ScopedLock lock(mConditionalWait);
141       if(!mDestroyThread)
142       {
143         mIsThreadIdle = true;
144         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
145         mConditionalWait.Wait(lock);
146         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
147       }
148     }
149     else
150     {
151       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
152       task->Process();
153       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
154       if(!mDestroyThread)
155       {
156         mAsyncTaskManager.CompleteTask(std::move(task));
157       }
158     }
159   }
160 }
161
162 // AsyncTaskManager::CacheImpl
163
164 struct AsyncTaskManager::CacheImpl
165 {
166   CacheImpl(AsyncTaskManager& manager)
167   : mManager(manager)
168   {
169   }
170
171 public:
172   // Insert / Erase task cache API.
173
174   /**
175    * @brief Insert cache that input task.
176    * @pre Mutex be locked.
177    */
178   template<typename CacheContainer, typename Iterator>
179   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
180   {
181     auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
182     cacheContainer.insert(cacheContainer.end(), iterator);
183   }
184
185   /**
186    * @brief Erase cache that input task.
187    * @pre Mutex be locked.
188    */
189   template<typename CacheContainer, typename Iterator>
190   static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
191   {
192     auto mapIter = cacheMap.find(task.Get());
193     if(mapIter != cacheMap.end())
194     {
195       auto& cacheContainer = (*mapIter).second;
196       auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
197
198       if(cacheIter != cacheContainer.end())
199       {
200         cacheContainer.erase(cacheIter);
201         if(cacheContainer.empty())
202         {
203           cacheMap.erase(mapIter);
204         }
205       }
206     }
207   }
208
209   /**
210    * @brief Erase all cache that input task.
211    * @pre Mutex be locked.
212    */
213   template<typename CacheContainer>
214   static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
215   {
216     auto mapIter = cacheMap.find(task.Get());
217     if(mapIter != cacheMap.end())
218     {
219       cacheMap.erase(mapIter);
220     }
221   }
222
223 public:
224   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
225
226   // Keep cache iterators as list since we take tasks by FIFO as default.
227   using TaskCacheContainer          = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
228   using RunningTaskCacheContainer   = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
229   using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
230
231   TaskCacheContainer          mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
232   RunningTaskCacheContainer   mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
233   CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
234 };
235
236 // AsyncTaskManager
237
238 Dali::AsyncTaskManager AsyncTaskManager::Get()
239 {
240   Dali::AsyncTaskManager manager;
241   SingletonService       singletonService(SingletonService::Get());
242   if(singletonService)
243   {
244     // Check whether the async task manager is already created
245     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
246     if(handle)
247     {
248       // If so, downcast the handle of singleton
249       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
250     }
251
252     if(!manager)
253     {
254       // If not, create the async task manager and register it as a singleton
255       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
256       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
257       singletonService.Register(typeid(manager), manager);
258     }
259   }
260   return manager;
261 }
262
263 AsyncTaskManager::AsyncTaskManager()
264 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
265   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
266   mWaitingHighProirityTaskCounts(0u),
267   mCacheImpl(new CacheImpl(*this)),
268   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
269   mProcessorRegistered(false)
270 {
271 }
272
273 AsyncTaskManager::~AsyncTaskManager()
274 {
275   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
276   {
277     mProcessorRegistered = false;
278     Dali::Adaptor::Get().UnregisterProcessor(*this);
279   }
280
281   // Join all threads.
282   mTasks.Clear();
283
284   // Remove cache impl after all threads are join.
285   mCacheImpl.reset();
286
287   // Remove tasks after CacheImpl removed
288   mWaitingTasks.clear();
289   mRunningTasks.clear();
290   mCompletedTasks.clear();
291 }
292
293 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
294 {
295   if(task)
296   {
297     // Lock while adding task to the queue
298     Mutex::ScopedLock lock(mWaitingTasksMutex);
299
300     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
301
302     // push back into waiting queue.
303     auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
304     CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
305
306     if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
307     {
308       // Increase the number of waiting tasks for high priority.
309       ++mWaitingHighProirityTaskCounts;
310     }
311
312     {
313       // For thread safety
314       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
315
316       // Finish all Running threads are working
317       if(mRunningTasks.size() >= mTasks.GetElementCount())
318       {
319         return;
320       }
321     }
322   }
323
324   size_t count = mTasks.GetElementCount();
325   size_t index = 0;
326   while(index++ < count)
327   {
328     auto processHelperIt = mTasks.GetNext();
329     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
330     if(processHelperIt->Request())
331     {
332       break;
333     }
334     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
335   }
336
337   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
338   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
339   {
340     Dali::Adaptor::Get().RegisterProcessor(*this);
341     mProcessorRegistered = true;
342   }
343
344   return;
345 }
346
347 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
348 {
349   if(task)
350   {
351     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
352
353     // Check whether we need to unregister processor.
354     // If there is some non-empty queue exist, we don't need to unregister processor.
355     bool needCheckUnregisterProcessor = true;
356
357     {
358       // Lock while remove task from the queue
359       Mutex::ScopedLock lock(mWaitingTasksMutex);
360
361       auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
362       if(mapIter != mCacheImpl->mWaitingTasksCache.end())
363       {
364         for(auto& iterator : mapIter->second)
365         {
366           DALI_ASSERT_DEBUG((*iterator) == task);
367           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
368           {
369             // Decrease the number of waiting tasks for high priority.
370             --mWaitingHighProirityTaskCounts;
371           }
372           mWaitingTasks.erase(iterator);
373         }
374         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
375       }
376
377       if(!mWaitingTasks.empty())
378       {
379         needCheckUnregisterProcessor = false;
380       }
381     }
382
383     {
384       // Lock while remove task from the queue
385       Mutex::ScopedLock lock(mRunningTasksMutex);
386
387       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
388       if(mapIter != mCacheImpl->mRunningTasksCache.end())
389       {
390         for(auto& iterator : mapIter->second)
391         {
392           DALI_ASSERT_DEBUG((*iterator).first == task);
393           // We cannot erase container. Just mark as canceled.
394           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
395           (*iterator).second = RunningTaskState::CANCELED;
396         }
397       }
398
399       if(!mRunningTasks.empty())
400       {
401         needCheckUnregisterProcessor = false;
402       }
403     }
404
405     {
406       // Lock while remove task from the queue
407       Mutex::ScopedLock lock(mCompletedTasksMutex);
408
409       auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
410       if(mapIter != mCacheImpl->mCompletedTasksCache.end())
411       {
412         for(auto& iterator : mapIter->second)
413         {
414           DALI_ASSERT_DEBUG(iterator->first == task);
415           mCompletedTasks.erase(iterator);
416         }
417         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
418       }
419
420       if(!mCompletedTasks.empty())
421       {
422         needCheckUnregisterProcessor = false;
423       }
424     }
425
426     // UnregisterProcessor required to lock mutex. Call this API only if required.
427     if(needCheckUnregisterProcessor)
428     {
429       UnregisterProcessor();
430     }
431   }
432 }
433
434 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
435 {
436   // Lock while popping task out from the queue
437   Mutex::ScopedLock lock(mCompletedTasksMutex);
438
439   AsyncTaskPtr nextCompletedTask = nullptr;
440
441   while(!mCompletedTasks.empty())
442   {
443     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
444
445     auto               next      = mCompletedTasks.begin();
446     AsyncTaskPtr       nextTask  = next->first;
447     CompletedTaskState taskState = next->second;
448     CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
449     mCompletedTasks.erase(next);
450
451     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
452
453     if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
454     {
455       nextCompletedTask = nextTask;
456       break;
457     }
458   }
459
460   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
461
462   return nextCompletedTask;
463 }
464
465 void AsyncTaskManager::UnregisterProcessor()
466 {
467   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
468   {
469     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
470     // Keep processor at least 1 task exist.
471     // Please be careful the order of mutex, to avoid dead lock.
472     // TODO : Should we lock all mutex rightnow?
473     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
474     if(mWaitingTasks.empty())
475     {
476       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
477       if(mRunningTasks.empty())
478       {
479         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
480         if(mCompletedTasks.empty())
481         {
482           mProcessorRegistered = false;
483           Dali::Adaptor::Get().UnregisterProcessor(*this);
484         }
485       }
486     }
487     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
488   }
489 }
490
491 void AsyncTaskManager::TasksCompleted()
492 {
493   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
494   while(AsyncTaskPtr task = PopNextCompletedTask())
495   {
496     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
497     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
498   }
499
500   UnregisterProcessor();
501   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
502 }
503
504 void AsyncTaskManager::Process(bool postProcessor)
505 {
506   TasksCompleted();
507 }
508
509 /// Worker thread called
510 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
511 {
512   // Lock while popping task out from the queue
513   Mutex::ScopedLock lock(mWaitingTasksMutex);
514
515   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
516
517   // pop out the next task from the queue
518   AsyncTaskPtr nextTask = nullptr;
519
520   // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
521   if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
522   {
523     // For thread safety
524     Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
525
526     if(mAvaliableLowPriorityTaskCounts == 0u)
527     {
528       // There are no avaliabe tasks to run now. Return nullptr.
529       return nextTask;
530     }
531   }
532
533   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
534   {
535     if((*iter)->IsReady())
536     {
537       const auto priorityType  = (*iter)->GetPriorityType();
538       bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
539       if(!taskAvaliable)
540       {
541         // For thread safety
542         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
543
544         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
545       }
546
547       if(taskAvaliable)
548       {
549         nextTask = *iter;
550
551         // Add Running queue
552         {
553           // Lock while popping task out from the queue
554           Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
555
556           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
557
558           auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
559           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
560
561           CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
562           mWaitingTasks.erase(iter);
563
564           // Decrease avaliable task counts if it is low priority
565           if(priorityType == AsyncTask::PriorityType::LOW)
566           {
567             // We are under running task mutex. We can decrease it.
568             --mAvaliableLowPriorityTaskCounts;
569           }
570         }
571
572         if(priorityType == AsyncTask::PriorityType::HIGH)
573         {
574           // Decrease the number of waiting tasks for high priority.
575           --mWaitingHighProirityTaskCounts;
576         }
577         break;
578       }
579     }
580   }
581
582   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
583
584   return nextTask;
585 }
586
587 /// Worker thread called
588 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
589 {
590   bool notify = false;
591
592   if(task)
593   {
594     const bool needTrigger = task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD;
595
596     // Lock while check validation of task.
597     {
598       Mutex::ScopedLock lock(mRunningTasksMutex);
599
600       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
601       if(mapIter != mCacheImpl->mRunningTasksCache.end())
602       {
603         const auto cacheIter = mapIter->second.begin();
604         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
605
606         const auto iter = *cacheIter;
607         DALI_ASSERT_DEBUG(iter->first == task);
608         if(iter->second == RunningTaskState::RUNNING)
609         {
610           // This task is valid.
611           notify = true;
612         }
613       }
614
615       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
616     }
617
618     // We should execute this tasks complete callback out of mutex
619     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
620     {
621       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
622       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
623     }
624
625     // Lock while adding task to the queue
626     {
627       Mutex::ScopedLock lock(mRunningTasksMutex);
628
629       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
630       if(mapIter != mCacheImpl->mRunningTasksCache.end())
631       {
632         const auto cacheIter = mapIter->second.begin();
633         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
634
635         const auto iter         = *cacheIter;
636         const auto priorityType = iter->first->GetPriorityType();
637         // Increase avaliable task counts if it is low priority
638         if(priorityType == AsyncTask::PriorityType::LOW)
639         {
640           // We are under running task mutex. We can increase it.
641           ++mAvaliableLowPriorityTaskCounts;
642         }
643
644         // Move task into completed, for ensure that AsyncTask destroy at main thread.
645         {
646           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
647
648           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
649
650           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
651
652           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
653           CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
654
655           CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
656           mRunningTasks.erase(iter);
657
658           // Now, task is invalidate.
659           task.Reset();
660         }
661       }
662     }
663
664     // Wake up the main thread
665     if(needTrigger)
666     {
667       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
668       mTrigger->Trigger();
669     }
670   }
671 }
672
673 // AsyncTaskManager::TaskHelper
674
675 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
676 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
677 {
678 }
679
680 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
681 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
682 {
683 }
684
685 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
686 : mProcessor(std::move(processor)),
687   mAsyncTaskManager(asyncTaskManager)
688 {
689 }
690
691 bool AsyncTaskManager::TaskHelper::Request()
692 {
693   return mProcessor->Request();
694 }
695 } // namespace Adaptor
696
697 } // namespace Internal
698
699 } // namespace Dali