Merge "Ensure task moved under mutex" into devel/master
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 #include <unordered_map>
29
30 namespace Dali
31 {
32 namespace Internal
33 {
34 namespace Adaptor
35 {
36 namespace
37 {
38 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
39 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
40
41 // The number of threads for low priority task.
42 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
43 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
44
45 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
46 {
47   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
48   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
49   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
50   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
51   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
52 }
53
54 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
55 {
56   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
57   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
58   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
59   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
60 }
61
62 #if defined(DEBUG_ENABLED)
63 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
64
65 uint32_t gThreadId = 0u; // Only for debug
66 #endif
67
68 } // unnamed namespace
69
70 // AsyncTaskThread
71
72 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
73 : mConditionalWait(),
74   mAsyncTaskManager(asyncTaskManager),
75   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
76   mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
77   mDestroyThread(false),
78   mIsThreadStarted(false),
79   mIsThreadIdle(true)
80 {
81 }
82
83 AsyncTaskThread::~AsyncTaskThread()
84 {
85   // Stop the thread
86   {
87     ConditionalWait::ScopedLock lock(mConditionalWait);
88     mDestroyThread = true;
89     mConditionalWait.Notify(lock);
90   }
91
92   Join();
93 }
94
95 bool AsyncTaskThread::Request()
96 {
97   if(!mIsThreadStarted)
98   {
99     Start();
100     mIsThreadStarted = true;
101   }
102
103   {
104     // Lock while adding task to the queue
105     ConditionalWait::ScopedLock lock(mConditionalWait);
106
107     if(mIsThreadIdle)
108     {
109       mIsThreadIdle = false;
110
111       // wake up the thread
112       mConditionalWait.Notify(lock);
113       return true;
114     }
115   }
116
117   return false;
118 }
119
120 void AsyncTaskThread::Run()
121 {
122 #if defined(DEBUG_ENABLED)
123   uint32_t threadId = gThreadId++;
124   {
125     char temp[100];
126     snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
127     SetThreadName(temp);
128   }
129 #else
130   SetThreadName("AsyncTaskThread");
131 #endif
132   mLogFactory.InstallLogFunction();
133   mTraceFactory.InstallTraceFunction();
134
135   while(!mDestroyThread)
136   {
137     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
138     if(!task)
139     {
140       ConditionalWait::ScopedLock lock(mConditionalWait);
141       if(!mDestroyThread)
142       {
143         mIsThreadIdle = true;
144         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
145         mConditionalWait.Wait(lock);
146         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
147       }
148     }
149     else
150     {
151       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
152       task->Process();
153       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
154       if(!mDestroyThread)
155       {
156         mAsyncTaskManager.CompleteTask(std::move(task));
157       }
158     }
159   }
160 }
161
162 // AsyncTaskManager::CacheImpl
163
164 struct AsyncTaskManager::CacheImpl
165 {
166   CacheImpl(AsyncTaskManager& manager)
167   : mManager(manager)
168   {
169   }
170
171 public:
172   // Insert / Erase task cache API.
173
174   /**
175    * @brief Insert cache that input task.
176    * @pre Mutex be locked.
177    */
178   template<typename CacheContainer, typename Iterator>
179   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
180   {
181     auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
182     cacheContainer.insert(cacheContainer.end(), iterator);
183   }
184
185   /**
186    * @brief Erase cache that input task.
187    * @pre Mutex be locked.
188    */
189   template<typename CacheContainer, typename Iterator>
190   static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
191   {
192     auto mapIter = cacheMap.find(task.Get());
193     if(mapIter != cacheMap.end())
194     {
195       auto& cacheContainer = (*mapIter).second;
196       auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
197
198       if(cacheIter != cacheContainer.end())
199       {
200         cacheContainer.erase(cacheIter);
201         if(cacheContainer.empty())
202         {
203           cacheMap.erase(mapIter);
204         }
205       }
206     }
207   }
208
209   /**
210    * @brief Erase all cache that input task.
211    * @pre Mutex be locked.
212    */
213   template<typename CacheContainer>
214   static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
215   {
216     auto mapIter = cacheMap.find(task.Get());
217     if(mapIter != cacheMap.end())
218     {
219       cacheMap.erase(mapIter);
220     }
221   }
222
223 public:
224   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
225
226   // Keep cache iterators as list since we take tasks by FIFO as default.
227   using TaskCacheContainer          = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
228   using RunningTaskCacheContainer   = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
229   using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
230
231   TaskCacheContainer          mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
232   RunningTaskCacheContainer   mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
233   CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
234 };
235
236 // AsyncTaskManager
237
238 Dali::AsyncTaskManager AsyncTaskManager::Get()
239 {
240   Dali::AsyncTaskManager manager;
241   SingletonService       singletonService(SingletonService::Get());
242   if(singletonService)
243   {
244     // Check whether the async task manager is already created
245     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
246     if(handle)
247     {
248       // If so, downcast the handle of singleton
249       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
250     }
251
252     if(!manager)
253     {
254       // If not, create the async task manager and register it as a singleton
255       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
256       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
257       singletonService.Register(typeid(manager), manager);
258     }
259   }
260   return manager;
261 }
262
263 AsyncTaskManager::AsyncTaskManager()
264 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
265   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
266   mWaitingHighProirityTaskCounts(0u),
267   mCacheImpl(new CacheImpl(*this)),
268   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
269   mProcessorRegistered(false)
270 {
271 }
272
273 AsyncTaskManager::~AsyncTaskManager()
274 {
275   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
276   {
277     mProcessorRegistered = false;
278     Dali::Adaptor::Get().UnregisterProcessor(*this);
279   }
280
281   mTasks.Clear();
282 }
283
284 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
285 {
286   if(task)
287   {
288     // Lock while adding task to the queue
289     Mutex::ScopedLock lock(mWaitingTasksMutex);
290
291     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
292
293     // push back into waiting queue.
294     auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
295     CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
296
297     if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
298     {
299       // Increase the number of waiting tasks for high priority.
300       ++mWaitingHighProirityTaskCounts;
301     }
302
303     {
304       // For thread safety
305       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
306
307       // Finish all Running threads are working
308       if(mRunningTasks.size() >= mTasks.GetElementCount())
309       {
310         return;
311       }
312     }
313   }
314
315   size_t count = mTasks.GetElementCount();
316   size_t index = 0;
317   while(index++ < count)
318   {
319     auto processHelperIt = mTasks.GetNext();
320     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
321     if(processHelperIt->Request())
322     {
323       break;
324     }
325     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
326   }
327
328   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
329   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
330   {
331     Dali::Adaptor::Get().RegisterProcessor(*this);
332     mProcessorRegistered = true;
333   }
334
335   return;
336 }
337
338 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
339 {
340   if(task)
341   {
342     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
343
344     // Check whether we need to unregister processor.
345     // If there is some non-empty queue exist, we don't need to unregister processor.
346     bool needCheckUnregisterProcessor = true;
347
348     {
349       // Lock while remove task from the queue
350       Mutex::ScopedLock lock(mWaitingTasksMutex);
351
352       auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
353       if(mapIter != mCacheImpl->mWaitingTasksCache.end())
354       {
355         for(auto& iterator : mapIter->second)
356         {
357           DALI_ASSERT_DEBUG((*iterator) == task);
358           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
359           {
360             // Decrease the number of waiting tasks for high priority.
361             --mWaitingHighProirityTaskCounts;
362           }
363           mWaitingTasks.erase(iterator);
364         }
365         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
366       }
367
368       if(!mWaitingTasks.empty())
369       {
370         needCheckUnregisterProcessor = false;
371       }
372     }
373
374     {
375       // Lock while remove task from the queue
376       Mutex::ScopedLock lock(mRunningTasksMutex);
377
378       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
379       if(mapIter != mCacheImpl->mRunningTasksCache.end())
380       {
381         for(auto& iterator : mapIter->second)
382         {
383           DALI_ASSERT_DEBUG((*iterator).first == task);
384           // We cannot erase container. Just mark as canceled.
385           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
386           (*iterator).second = RunningTaskState::CANCELED;
387         }
388       }
389
390       if(!mRunningTasks.empty())
391       {
392         needCheckUnregisterProcessor = false;
393       }
394     }
395
396     {
397       // Lock while remove task from the queue
398       Mutex::ScopedLock lock(mCompletedTasksMutex);
399
400       auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
401       if(mapIter != mCacheImpl->mCompletedTasksCache.end())
402       {
403         for(auto& iterator : mapIter->second)
404         {
405           DALI_ASSERT_DEBUG(iterator->first == task);
406           mCompletedTasks.erase(iterator);
407         }
408         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
409       }
410
411       if(!mCompletedTasks.empty())
412       {
413         needCheckUnregisterProcessor = false;
414       }
415     }
416
417     // UnregisterProcessor required to lock mutex. Call this API only if required.
418     if(needCheckUnregisterProcessor)
419     {
420       UnregisterProcessor();
421     }
422   }
423 }
424
425 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
426 {
427   // Lock while popping task out from the queue
428   Mutex::ScopedLock lock(mCompletedTasksMutex);
429
430   AsyncTaskPtr nextCompletedTask = nullptr;
431
432   while(!mCompletedTasks.empty())
433   {
434     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
435
436     auto               next      = mCompletedTasks.begin();
437     AsyncTaskPtr       nextTask  = next->first;
438     CompletedTaskState taskState = next->second;
439     CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
440     mCompletedTasks.erase(next);
441
442     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
443
444     if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
445     {
446       nextCompletedTask = nextTask;
447       break;
448     }
449   }
450
451   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
452
453   return nextCompletedTask;
454 }
455
456 void AsyncTaskManager::UnregisterProcessor()
457 {
458   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
459   {
460     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
461     // Keep processor at least 1 task exist.
462     // Please be careful the order of mutex, to avoid dead lock.
463     // TODO : Should we lock all mutex rightnow?
464     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
465     if(mWaitingTasks.empty())
466     {
467       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
468       if(mRunningTasks.empty())
469       {
470         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
471         if(mCompletedTasks.empty())
472         {
473           mProcessorRegistered = false;
474           Dali::Adaptor::Get().UnregisterProcessor(*this);
475         }
476       }
477     }
478     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
479   }
480 }
481
482 void AsyncTaskManager::TasksCompleted()
483 {
484   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
485   while(AsyncTaskPtr task = PopNextCompletedTask())
486   {
487     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
488     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
489   }
490
491   UnregisterProcessor();
492   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
493 }
494
495 void AsyncTaskManager::Process(bool postProcessor)
496 {
497   TasksCompleted();
498 }
499
500 /// Worker thread called
501 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
502 {
503   // Lock while popping task out from the queue
504   Mutex::ScopedLock lock(mWaitingTasksMutex);
505
506   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
507
508   // pop out the next task from the queue
509   AsyncTaskPtr nextTask = nullptr;
510
511   // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
512   if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
513   {
514     // For thread safety
515     Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
516
517     if(mAvaliableLowPriorityTaskCounts == 0u)
518     {
519       // There are no avaliabe tasks to run now. Return nullptr.
520       return nextTask;
521     }
522   }
523
524   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
525   {
526     if((*iter)->IsReady())
527     {
528       const auto priorityType  = (*iter)->GetPriorityType();
529       bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
530       if(!taskAvaliable)
531       {
532         // For thread safety
533         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
534
535         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
536       }
537
538       if(taskAvaliable)
539       {
540         nextTask = *iter;
541
542         // Add Running queue
543         {
544           // Lock while popping task out from the queue
545           Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
546
547           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
548
549           auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
550           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
551
552           CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
553           mWaitingTasks.erase(iter);
554
555           // Decrease avaliable task counts if it is low priority
556           if(priorityType == AsyncTask::PriorityType::LOW)
557           {
558             // We are under running task mutex. We can decrease it.
559             --mAvaliableLowPriorityTaskCounts;
560           }
561         }
562
563         if(priorityType == AsyncTask::PriorityType::HIGH)
564         {
565           // Decrease the number of waiting tasks for high priority.
566           --mWaitingHighProirityTaskCounts;
567         }
568         break;
569       }
570     }
571   }
572
573   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
574
575   return nextTask;
576 }
577
578 /// Worker thread called
579 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
580 {
581   bool notify = false;
582
583   if(task)
584   {
585     const bool needTrigger = task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD;
586
587     // Lock while check validation of task.
588     {
589       Mutex::ScopedLock lock(mRunningTasksMutex);
590
591       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
592       if(mapIter != mCacheImpl->mRunningTasksCache.end())
593       {
594         const auto cacheIter = mapIter->second.begin();
595         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
596
597         const auto iter = *cacheIter;
598         DALI_ASSERT_DEBUG(iter->first == task);
599         if(iter->second == RunningTaskState::RUNNING)
600         {
601           // This task is valid.
602           notify = true;
603         }
604       }
605
606       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
607     }
608
609     // We should execute this tasks complete callback out of mutex
610     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
611     {
612       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
613       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
614     }
615
616     // Lock while adding task to the queue
617     {
618       Mutex::ScopedLock lock(mRunningTasksMutex);
619
620       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
621       if(mapIter != mCacheImpl->mRunningTasksCache.end())
622       {
623         const auto cacheIter = mapIter->second.begin();
624         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
625
626         const auto iter         = *cacheIter;
627         const auto priorityType = iter->first->GetPriorityType();
628         // Increase avaliable task counts if it is low priority
629         if(priorityType == AsyncTask::PriorityType::LOW)
630         {
631           // We are under running task mutex. We can increase it.
632           ++mAvaliableLowPriorityTaskCounts;
633         }
634
635         // Move task into completed, for ensure that AsyncTask destroy at main thread.
636         {
637           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
638
639           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
640
641           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
642
643           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
644           CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
645
646           CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
647           mRunningTasks.erase(iter);
648
649           // Now, task is invalidate.
650           task.Reset();
651         }
652       }
653     }
654
655     // Wake up the main thread
656     if(needTrigger)
657     {
658       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
659       mTrigger->Trigger();
660     }
661   }
662 }
663
664 // AsyncTaskManager::TaskHelper
665
666 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
667 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
668 {
669 }
670
671 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
672 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
673 {
674 }
675
676 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
677 : mProcessor(std::move(processor)),
678   mAsyncTaskManager(asyncTaskManager)
679 {
680 }
681
682 bool AsyncTaskManager::TaskHelper::Request()
683 {
684   return mProcessor->Request();
685 }
686 } // namespace Adaptor
687
688 } // namespace Internal
689
690 } // namespace Dali