[dali_2.3.21] Merge branch 'devel/master'
[platform/core/uifw/dali-toolkit.git] / automated-tests / src / dali-toolkit / dali-toolkit-test-utils / toolkit-async-task-manager.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <toolkit-async-task-manager.h>
20
21 // EXTERNAL INCLUDE
22 #include <dali/devel-api/adaptor-framework/thread-settings.h>
23 #include <dali/devel-api/threading/conditional-wait.h>
24 #include <dali/devel-api/threading/mutex.h>
25 #include <dali/devel-api/threading/thread.h>
26 #include <dali/public-api/adaptor-framework/async-task-manager.h>
27 #include <dali/public-api/adaptor-framework/round-robin-container-view.h>
28 #include <dali/public-api/common/list-wrapper.h>
29 #include <dali/public-api/object/base-object.h>
30 #include <memory>
31 #include <unordered_map>
32
33 // INTERNAL INCLUDE
34 #include <toolkit-application.h>
35 #include <toolkit-environment-variable.h>
36 #include <toolkit-event-thread-callback.h>
37 #include "dali-test-suite-utils.h"
38
39 namespace Dali
40 {
41 namespace Internal
42 {
43 namespace Adaptor
44 {
45 namespace
46 {
47 std::atomic_uint32_t gThreadId = 0u;
48 }
49
50 class AsyncTaskThread;
51
52 class AsyncTaskManager : public Dali::BaseObject
53 {
54 public:
55   static Dali::AsyncTaskManager Get();
56
57   AsyncTaskManager();
58   ~AsyncTaskManager();
59
60 public:
61   void AddTask(AsyncTaskPtr task);
62   void RemoveTask(AsyncTaskPtr task);
63
64   Dali::AsyncTaskManager::TasksCompletedId SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask);
65   bool                                     RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId);
66
67 public:
68   AsyncTaskPtr PopNextCompletedTask();
69   void         TaskCompleted();
70   void         TaskAllCompleted();
71
72 public: // Worker thread called method
73   AsyncTaskPtr PopNextTaskToProcess();
74   void         CompleteTask(AsyncTaskPtr&& task);
75
76 private: /**
77    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
78    */
79   class TaskHelper
80   {
81   public:
82     /**
83      * @brief Create an TaskHelper.
84      *
85      * @param[in] asyncTaskManager Reference to the AsyncTaskManager
86      */
87     TaskHelper(AsyncTaskManager& asyncTaskManager);
88
89     /**
90      * @brief Request the thread to process the task.
91      * @return True if the request succeeds, otherwise false.
92      */
93     bool Request();
94
95   public:
96     TaskHelper(const TaskHelper&) = delete;
97     TaskHelper& operator=(const TaskHelper&) = delete;
98
99     TaskHelper(TaskHelper&& rhs);
100     TaskHelper& operator=(TaskHelper&& rhs) = delete;
101
102   private:
103     /**
104      * @brief Main constructor that used by all other constructors
105      */
106     TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager);
107
108   private:
109     std::unique_ptr<AsyncTaskThread> mProcessor;
110     AsyncTaskManager&                mAsyncTaskManager;
111   };
112
113   /**
114    * @brief State of running task
115    */
116   enum class RunningTaskState
117   {
118     RUNNING  = 0, ///< Running task
119     CANCELED = 1, ///< Canceled by user
120   };
121
122   /**
123    * @brief State of complete task
124    */
125   enum class CompletedTaskState
126   {
127     REQUIRE_CALLBACK = 0, ///< Need to execute callback when completed task process.
128     SKIP_CALLBACK    = 1, ///< Do not execute callback
129   };
130
131 private:
132   // Undefined
133   AsyncTaskManager(const AsyncTaskManager& manager);
134
135   // Undefined
136   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
137
138 private:
139   // Keep Task as list since we take tasks by FIFO as default.
140   using AsyncTaskContainer = std::list<AsyncTaskPtr>;
141
142   using AsyncRunningTaskPair      = std::pair<AsyncTaskPtr, RunningTaskState>;
143   using AsyncRunningTaskContainer = std::list<AsyncRunningTaskPair>;
144
145   using AsyncCompletedTaskPair      = std::pair<AsyncTaskPtr, CompletedTaskState>;
146   using AsyncCompletedTaskContainer = std::list<AsyncCompletedTaskPair>;
147
148   AsyncTaskContainer          mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
149   AsyncRunningTaskContainer   mRunningTasks;   ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
150   AsyncCompletedTaskContainer mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
151
152   RoundRobinContainerView<TaskHelper> mTasks;
153
154   Dali::Mutex mWaitingTasksMutex;   ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
155   Dali::Mutex mRunningTasksMutex;   ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
156   Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
157
158   std::unique_ptr<EventThreadCallback> mTrigger;
159
160   struct TasksCompletedImpl;
161   std::unique_ptr<TasksCompletedImpl> mTasksCompletedImpl; ///< TaskS completed signal interface for AsyncTaskManager.
162 };
163
164 inline Internal::Adaptor::AsyncTaskManager& GetImplementation(Dali::AsyncTaskManager& obj)
165 {
166   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
167
168   Dali::BaseObject& handle = obj.GetBaseObject();
169
170   return static_cast<Internal::Adaptor::AsyncTaskManager&>(handle);
171 }
172
173 inline const Internal::Adaptor::AsyncTaskManager& GetImplementation(const Dali::AsyncTaskManager& obj)
174 {
175   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
176
177   const Dali::BaseObject& handle = obj.GetBaseObject();
178
179   return static_cast<const Internal::Adaptor::AsyncTaskManager&>(handle);
180 }
181
182 /********************************************************************************/
183 /*********************************  INTERNAL CLASS  *****************************/
184 /********************************************************************************/
185
186 namespace
187 {
188 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
189
190 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
191 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
192
193 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
194 {
195   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
196   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
197   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
198   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
199   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
200 }
201
202 thread_local Dali::AsyncTaskManager gAsyncTaskManager;
203 } // namespace
204
205 /**
206  * The worker thread for async process
207  */
208 class AsyncTaskThread : public Thread
209 {
210 public:
211   /**
212    * Constructor.
213    */
214   AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
215   : mConditionalWait(),
216     mAsyncTaskManager(asyncTaskManager),
217     mThreadId(++gThreadId),
218     mDestroyThread(false),
219     mIsThreadStarted(false),
220     mIsThreadIdle(true)
221   {
222   }
223
224   /**
225    * Destructor.
226    */
227   ~AsyncTaskThread() override
228   {
229     // Stop the thread
230     {
231       ConditionalWait::ScopedLock lock(mConditionalWait);
232       mDestroyThread = true;
233       mConditionalWait.Notify(lock);
234     }
235
236     Join();
237   }
238
239   /**
240    * @brief Request the thread to process the task.
241    * @return True if the request is successed, otherwise false.
242    */
243   bool Request()
244   {
245     if(!mIsThreadStarted)
246     {
247       Start();
248       mIsThreadStarted = true;
249     }
250
251     {
252       // Lock while adding task to the queue
253       ConditionalWait::ScopedLock lock(mConditionalWait);
254
255       if(mIsThreadIdle)
256       {
257         mIsThreadIdle = false;
258
259         // wake up the thread
260         mConditionalWait.Notify(lock);
261         return true;
262       }
263     }
264
265     return false;
266   }
267
268 protected:
269   /**
270    * The entry function of the worker thread.
271    */
272   void Run() override
273   {
274     {
275       char temp[100];
276       snprintf(temp, 100, "AsyncTaskThread[%u]", mThreadId);
277       SetThreadName(temp);
278     }
279
280     while(!mDestroyThread)
281     {
282       AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
283       if(!task)
284       {
285         ConditionalWait::ScopedLock lock(mConditionalWait);
286         if(!mDestroyThread)
287         {
288           mIsThreadIdle = true;
289           mConditionalWait.Wait(lock);
290         }
291       }
292       else
293       {
294         tet_printf("BEGIN: AsyncTask[%s] Process\n", task->GetTaskName().data());
295         task->Process();
296         tet_printf("END: AsyncTask[%s] Process\n", task->GetTaskName().data());
297         if(!mDestroyThread)
298         {
299           mAsyncTaskManager.CompleteTask(std::move(task));
300         }
301       }
302     }
303   }
304
305 private:
306   // Undefined
307   AsyncTaskThread(const AsyncTaskThread& thread) = delete;
308
309   // Undefined
310   AsyncTaskThread& operator=(const AsyncTaskThread& thread) = delete;
311
312 private:
313   ConditionalWait   mConditionalWait;
314   AsyncTaskManager& mAsyncTaskManager;
315   uint32_t          mThreadId;
316   bool              mDestroyThread;
317   bool              mIsThreadStarted;
318   bool              mIsThreadIdle;
319 };
320
321 // AsyncTaskManager::TasksCompletedImpl
322
323 struct AsyncTaskManager::TasksCompletedImpl
324 {
325   TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
326   : mManager(manager),
327     mTrigger(trigger),
328     mEmitCompletedTaskTriggered(false)
329   {
330   }
331
332 public:
333   /**
334    * @brief Create new tasks completed id and.
335    * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
336    * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
337    */
338   Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
339   {
340     // Lock while adding tasks completed callback list to the queue
341     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
342
343     auto id = mTasksCompletedCount++;
344     DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
345
346     mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
347     return id;
348   }
349
350   /**
351    * @brief Append task that will be trace.
352    * @post RemoveTaskTrace should be called.
353    * @param[in] id The id of tasks completed.
354    * @param[in] task The task want to trace.
355    */
356   void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
357   {
358     // Lock while adding tasks completed callback list to the queue
359     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
360
361     auto iter = mTasksCompletedCallbackList.find(id);
362     if(iter == mTasksCompletedCallbackList.end())
363     {
364       // This task is already erased. Ignore.
365       return;
366     }
367
368     auto& callbackData = iter->second;
369
370     auto jter = callbackData.mTasks.find(task.Get());
371
372     if(jter != callbackData.mTasks.end())
373     {
374       // Increase reference count.
375       ++(jter->second);
376     }
377     else
378     {
379       callbackData.mTasks.insert({task.Get(), 1u});
380     }
381   }
382
383   /**
384    * @brief Remove all task that were traced.
385    * @param[in] task The task want to remove trace.
386    * @param[in] taskCount The number of tasks that will be removed.
387    */
388   void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
389   {
390     if(count == 0u)
391     {
392       return;
393     }
394
395     // Lock while removing tasks completed callback list to the queue
396     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
397
398     for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
399     {
400       auto& callbackData      = iter->second;
401       bool  eraseCallbackData = false;
402
403       auto jter = callbackData.mTasks.find(task.Get());
404
405       if(jter != callbackData.mTasks.end())
406       {
407         if(jter->second <= count)
408         {
409           callbackData.mTasks.erase(jter);
410
411           if(callbackData.mTasks.empty())
412           {
413             eraseCallbackData = true;
414
415             // Move callback base into list.
416             // (To avoid task container changed during callback emit)
417             RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
418
419             iter = mTasksCompletedCallbackList.erase(iter);
420           }
421         }
422         else
423         {
424           jter->second -= count;
425         }
426       }
427
428       if(!eraseCallbackData)
429       {
430         ++iter;
431       }
432     }
433   }
434
435   /**
436    * @brief Check whether current TasksCompletedId completed or not.
437    * @param[in] id The id of tasks completed.
438    * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
439    */
440   bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
441   {
442     // Lock while removing tasks completed callback list to the queue
443     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
444
445     auto iter = mTasksCompletedCallbackList.find(id);
446     if(iter != mTasksCompletedCallbackList.end())
447     {
448       auto& callbackData = iter->second;
449       if(callbackData.mTasks.empty())
450       {
451         // Move callback base into list.
452         // (To avoid task container changed during callback emit)
453         RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
454
455         iter = mTasksCompletedCallbackList.erase(iter);
456
457         return true;
458       }
459     }
460
461     return false;
462   }
463
464   /**
465    * @brief Remove taskS completed callbacks by id.
466    * @param[in] id The id of taskS completed.
467    * @return True if taskS completed id removed. False otherwise.
468    */
469   bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
470   {
471     // Lock while removing taskS completed callback list to the queue
472     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
473
474     auto iter = mTasksCompletedCallbackList.find(id);
475     if(iter == mTasksCompletedCallbackList.end())
476     {
477       // This task is already erased, or completed.
478       // Erase from completed excute callback list.
479
480       // Lock while removing excute callback list to the queue
481       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
482
483       for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
484       {
485         if(iter->second == id)
486         {
487           iter = mExcuteCallbackList.erase(iter);
488
489           return true;
490         }
491         else
492         {
493           ++iter;
494         }
495       }
496
497       // This task is alread erased and completed. Ignore.
498       return false;
499     }
500
501     mTasksCompletedCallbackList.erase(iter);
502
503     return true;
504   }
505
506   /**
507    * @brief Emit all completed callbacks.
508    * @note This API should be called at event thread.
509    */
510   void EmitCompletedTasks()
511   {
512     ExecuteCallbackContainer executeCallbackList;
513     {
514       // Lock while removing excute callback list to the queue
515       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
516
517       mEmitCompletedTaskTriggered = false;
518
519       // Copy callback lists, for let we execute callbacks out of mutex
520       executeCallbackList = std::move(mExcuteCallbackList);
521       mExcuteCallbackList.clear();
522     }
523
524     if(!executeCallbackList.empty())
525     {
526       // Execute all callbacks
527       for(auto&& callbackPair : executeCallbackList)
528       {
529         auto& callback = callbackPair.first;
530         auto  id       = callbackPair.second;
531
532         Dali::CallbackBase::Execute(*callback, id);
533       }
534     }
535   }
536
537   /**
538    * @brief Check whether there is some completed signal what we need to trace, or not.
539    * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
540    */
541   bool IsTasksCompletedCallbackExist()
542   {
543     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
544     return !mTasksCompletedCallbackList.empty();
545   }
546
547   /**
548    * @brief Check whether there is some completed signal what we need to execute, or not.
549    * @return True if mExcuteCallbackList is not empty. False otherwise.
550    */
551   bool IsExecuteCallbackExist()
552   {
553     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
554     return !mExcuteCallbackList.empty();
555   }
556
557 private:
558   void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
559   {
560     // Lock while adding excute callback list to the queue
561     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
562
563     mExcuteCallbackList.emplace_back(std::move(callback), id);
564
565     if(!mEmitCompletedTaskTriggered)
566     {
567       mEmitCompletedTaskTriggered = true;
568       mTrigger->Trigger();
569     }
570   }
571
572 private:
573   struct CallbackData
574   {
575   public:
576     CallbackData(CallbackBase* callback)
577     : mCallback(callback),
578       mTasks()
579     {
580     }
581
582     CallbackData(CallbackData&& rhs) noexcept
583     : mCallback(std::move(rhs.mCallback)),
584       mTasks(std::move(rhs.mTasks))
585     {
586     }
587
588     CallbackData& operator=(CallbackData&& rhs) noexcept
589     {
590       if(this != &rhs)
591       {
592         mCallback = std::move(rhs.mCallback);
593         mTasks    = std::move(rhs.mTasks);
594       }
595
596       return *this;
597     }
598
599   private:
600     // Delete copy operator.
601     CallbackData(const CallbackData& rhs) = delete;
602     CallbackData& operator=(const CallbackData& rhs) = delete;
603
604   public:
605     std::unique_ptr<CallbackBase>                  mCallback;
606     std::unordered_map<const AsyncTask*, uint32_t> mTasks;
607   };
608
609 private:
610   AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
611   EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
612
613   Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
614
615   using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
616   TasksCompletedContainer mTasksCompletedCallbackList;
617
618   using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
619   ExecuteCallbackContainer mExcuteCallbackList;
620
621   Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
622   Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
623
624   bool mEmitCompletedTaskTriggered : 1;
625 };
626
627 // AsyncTaskManager
628
629 Dali::AsyncTaskManager AsyncTaskManager::Get()
630 {
631   if(!gAsyncTaskManager)
632   {
633     gAsyncTaskManager = Dali::AsyncTaskManager(new AsyncTaskManager());
634   }
635   return gAsyncTaskManager;
636 }
637
638 AsyncTaskManager::AsyncTaskManager()
639 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
640   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TaskCompleted))),
641   mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get()))
642 {
643 }
644
645 AsyncTaskManager::~AsyncTaskManager()
646 {
647   // Join all threads.
648   mTasks.Clear();
649
650   // Remove task completed impl after all threads are join.
651   mTasksCompletedImpl.reset();
652
653   // Remove tasks
654   mWaitingTasks.clear();
655   mRunningTasks.clear();
656   mCompletedTasks.clear();
657 }
658
659 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
660 {
661   if(task)
662   {
663     // Lock while adding task to the queue
664     Mutex::ScopedLock lock(mWaitingTasksMutex);
665
666     // push back into waiting queue.
667     mWaitingTasks.insert(mWaitingTasks.end(), task);
668
669     {
670       // For thread safety
671       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
672
673       // Finish all Running threads are working
674       if(mRunningTasks.size() >= mTasks.GetElementCount())
675       {
676         return;
677       }
678     }
679   }
680
681   size_t count = mTasks.GetElementCount();
682   size_t index = 0;
683   while(index++ < count)
684   {
685     auto processHelperIt = mTasks.GetNext();
686     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
687     if(processHelperIt->Request())
688     {
689       break;
690     }
691     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
692   }
693 }
694
695 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
696 {
697   if(task)
698   {
699     uint32_t removedCount = 0u;
700
701     {
702       // Lock while remove task from the queue
703       Mutex::ScopedLock lock(mWaitingTasksMutex);
704
705       for(auto iterator = mWaitingTasks.begin(); iterator != mWaitingTasks.end();)
706       {
707         if((*iterator) == task)
708         {
709           iterator = mWaitingTasks.erase(iterator);
710           ++removedCount;
711         }
712         else
713         {
714           ++iterator;
715         }
716       }
717     }
718
719     {
720       // Lock while remove task from the queue
721       Mutex::ScopedLock lock(mRunningTasksMutex);
722
723       for(auto iterator = mRunningTasks.begin(); iterator != mRunningTasks.end();)
724       {
725         if((*iterator).first == task)
726         {
727           // We cannot erase container. Just mark as canceled.
728           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
729           if((*iterator).second == RunningTaskState::RUNNING)
730           {
731             (*iterator).second = RunningTaskState::CANCELED;
732             ++removedCount;
733           }
734         }
735         ++iterator;
736       }
737     }
738
739     {
740       // Lock while remove task from the queue
741       Mutex::ScopedLock lock(mCompletedTasksMutex);
742
743       for(auto iterator = mCompletedTasks.begin(); iterator != mCompletedTasks.end();)
744       {
745         if((*iterator).first == task)
746         {
747           if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
748           {
749             ++removedCount;
750           }
751           iterator = mCompletedTasks.erase(iterator);
752         }
753         else
754         {
755           ++iterator;
756         }
757       }
758     }
759
760     // Remove TasksCompleted callback trace
761     if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
762     {
763       mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
764     }
765   }
766 }
767
768 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
769 {
770   // mTasksCompletedImpl will take ownership of callback.
771   Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
772
773   bool taskAdded = false; ///< Flag whether at least one task tracing now.
774
775   // Please be careful the order of mutex, to avoid dead lock.
776   {
777     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
778     {
779       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
780       {
781         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
782
783         // Collect all tasks from waiting tasks
784         for(auto& task : mWaitingTasks)
785         {
786           auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
787                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
788
789           if((checkMask & mask) == checkMask)
790           {
791             taskAdded = true;
792             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
793           }
794         }
795
796         // Collect all tasks from running tasks
797         for(auto& taskPair : mRunningTasks)
798         {
799           // Trace only if it is running now.
800           if(taskPair.second == RunningTaskState::RUNNING)
801           {
802             auto& task      = taskPair.first;
803             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
804                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
805
806             if((checkMask & mask) == checkMask)
807             {
808               taskAdded = true;
809               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
810             }
811           }
812         }
813
814         // Collect all tasks from complete tasks
815         for(auto& taskPair : mCompletedTasks)
816         {
817           // Trace only if it is need callback.
818           // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
819           //        If worker thread invocation, than it already remove trace at completed timing.
820           //        If canceled cases, we don't append trace at running tasks already.
821           //        So, we don't need to trace for SKIP_CALLBACK cases.
822           if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
823           {
824             auto& task      = taskPair.first;
825             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
826                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
827
828             if((checkMask & mask) == checkMask)
829             {
830               taskAdded = true;
831               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
832             }
833           }
834         }
835       }
836     }
837   }
838
839   // If there is nothing to check task, just excute callback right now.
840   if(!taskAdded)
841   {
842     mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
843   }
844   return tasksCompletedId;
845 }
846
847 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
848 {
849   return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
850 }
851
852 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
853 {
854   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
855
856   AsyncTaskPtr nextCompletedTask = nullptr;
857   {
858     // Lock while popping task out from the queue
859     Mutex::ScopedLock lock(mCompletedTasksMutex);
860
861     while(!mCompletedTasks.empty())
862     {
863       auto               next      = mCompletedTasks.begin();
864       AsyncTaskPtr       nextTask  = next->first;
865       CompletedTaskState taskState = next->second;
866       mCompletedTasks.erase(next);
867
868       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
869       {
870         nextCompletedTask = nextTask;
871         break;
872       }
873
874       ignoredTaskList.push_back(nextTask);
875     }
876   }
877
878   return nextCompletedTask;
879 }
880
881 void AsyncTaskManager::TaskCompleted()
882 {
883   // For UTC, let we complete only 1 task here.
884   if(AsyncTaskPtr task = PopNextCompletedTask())
885   {
886     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
887
888     // Remove TasksCompleted callback trace
889     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
890     {
891       mTasksCompletedImpl->RemoveTaskTrace(task);
892     }
893   }
894
895   mTasksCompletedImpl->EmitCompletedTasks();
896 }
897
898 void AsyncTaskManager::TaskAllCompleted()
899 {
900   while(AsyncTaskPtr task = PopNextCompletedTask())
901   {
902     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
903
904     // Remove TasksCompleted callback trace
905     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
906     {
907       mTasksCompletedImpl->RemoveTaskTrace(task);
908     }
909   }
910
911   mTasksCompletedImpl->EmitCompletedTasks();
912 }
913
914 /// Worker thread called
915 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
916 {
917   // Lock while popping task out from the queue
918   Mutex::ScopedLock lock(mWaitingTasksMutex);
919
920   // pop out the next task from the queue
921   AsyncTaskPtr nextTask = nullptr;
922
923   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
924   {
925     if((*iter)->IsReady())
926     {
927       nextTask = *iter;
928
929       // Add Running queue
930       {
931         // Lock while popping task out from the queue
932         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
933
934         mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
935
936         mWaitingTasks.erase(iter);
937       }
938       break;
939     }
940   }
941
942   return nextTask;
943 }
944
945 /// Worker thread called
946 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
947 {
948   bool notify = false;
949
950   if(task)
951   {
952     bool needTrigger = false;
953
954     // Lock while check validation of task.
955     {
956       Mutex::ScopedLock lock(mRunningTasksMutex);
957
958       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
959       if(iter != mRunningTasks.end())
960       {
961         if(iter->second == RunningTaskState::RUNNING)
962         {
963           // This task is valid.
964           notify = true;
965         }
966       }
967     }
968
969     // We should execute this tasks complete callback out of mutex
970     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
971     {
972       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
973
974       // We need to remove task trace now.
975       if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
976       {
977         mTasksCompletedImpl->RemoveTaskTrace(task);
978
979         if(mTasksCompletedImpl->IsExecuteCallbackExist())
980         {
981           // We need to call EmitCompletedTasks(). Trigger main thread.
982           needTrigger = true;
983         }
984       }
985     }
986
987     // Lock while adding task to the queue
988     {
989       Mutex::ScopedLock lock(mRunningTasksMutex);
990
991       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
992       if(iter != mRunningTasks.end())
993       {
994         // Move task into completed, for ensure that AsyncTask destroy at main thread.
995         {
996           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
997
998           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
999
1000           needTrigger |= callbackRequired;
1001
1002           mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1003
1004           mRunningTasks.erase(iter);
1005
1006           if(!needTrigger)
1007           {
1008             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1009           }
1010
1011           // Now, task is invalidate.
1012           task.Reset();
1013         }
1014       }
1015     }
1016
1017     // Wake up the main thread
1018     if(needTrigger)
1019     {
1020       mTrigger->Trigger();
1021     }
1022   }
1023 }
1024
1025 // AsyncTaskManager::TaskHelper
1026
1027 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1028 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1029 {
1030 }
1031
1032 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1033 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1034 {
1035 }
1036
1037 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1038 : mProcessor(std::move(processor)),
1039   mAsyncTaskManager(asyncTaskManager)
1040 {
1041 }
1042
1043 bool AsyncTaskManager::TaskHelper::Request()
1044 {
1045   return mProcessor->Request();
1046 }
1047
1048 } // namespace Adaptor
1049
1050 } // namespace Internal
1051
1052 /********************************************************************************/
1053 /*********************************  PUBLIC CLASS  *******************************/
1054 /********************************************************************************/
1055
1056 AsyncTaskManager::AsyncTaskManager() = default;
1057
1058 AsyncTaskManager::~AsyncTaskManager() = default;
1059
1060 AsyncTaskManager AsyncTaskManager::Get()
1061 {
1062   return Internal::Adaptor::AsyncTaskManager::Get();
1063 }
1064
1065 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
1066 {
1067   Internal::Adaptor::GetImplementation(*this).AddTask(task);
1068 }
1069
1070 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
1071 {
1072   Internal::Adaptor::GetImplementation(*this).RemoveTask(task);
1073 }
1074
1075 AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, AsyncTaskManager::CompletedCallbackTraceMask mask)
1076 {
1077   return Internal::Adaptor::GetImplementation(*this).SetCompletedCallback(callback, mask);
1078 }
1079
1080 bool AsyncTaskManager::RemoveCompletedCallback(AsyncTaskManager::TasksCompletedId tasksCompletedId)
1081 {
1082   return Internal::Adaptor::GetImplementation(*this).RemoveCompletedCallback(tasksCompletedId);
1083 }
1084
1085 AsyncTaskManager::AsyncTaskManager(Internal::Adaptor::AsyncTaskManager* impl)
1086 : BaseHandle(impl)
1087 {
1088 }
1089
1090 } // namespace Dali
1091
1092 namespace Test
1093 {
1094 namespace AsyncTaskManager
1095 {
1096 void DestroyAsyncTaskManager()
1097 {
1098   Dali::Internal::Adaptor::gAsyncTaskManager.Reset();
1099 }
1100
1101 void ProcessSingleCompletedTask()
1102 {
1103   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1104   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskCompleted();
1105 }
1106
1107 void ProcessAllCompletedTask()
1108 {
1109   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1110   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskAllCompleted();
1111 }
1112 } // namespace AsyncTaskManager
1113 } // namespace Test