Sync AsyncTaskManager code with adaptor
[platform/core/uifw/dali-toolkit.git] / automated-tests / src / dali-toolkit / dali-toolkit-test-utils / toolkit-async-task-manager.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <toolkit-async-task-manager.h>
20
21 // EXTERNAL INCLUDE
22 #include <dali/devel-api/adaptor-framework/thread-settings.h>
23 #include <dali/devel-api/threading/conditional-wait.h>
24 #include <dali/devel-api/threading/mutex.h>
25 #include <dali/devel-api/threading/thread.h>
26 #include <dali/public-api/adaptor-framework/async-task-manager.h>
27 #include <dali/public-api/adaptor-framework/round-robin-container-view.h>
28 #include <dali/public-api/common/list-wrapper.h>
29 #include <dali/public-api/object/base-object.h>
30 #include <memory>
31 #include <unordered_map>
32
33 // INTERNAL INCLUDE
34 #include <toolkit-application.h>
35 #include <toolkit-environment-variable.h>
36 #include <toolkit-event-thread-callback.h>
37
38 namespace Dali
39 {
40 namespace Internal
41 {
42 namespace Adaptor
43 {
44 namespace
45 {
46 std::atomic_uint32_t gThreadId = 0u;
47 }
48
49 class AsyncTaskThread;
50
51 class AsyncTaskManager : public Dali::BaseObject
52 {
53 public:
54   static Dali::AsyncTaskManager Get();
55
56   AsyncTaskManager();
57   ~AsyncTaskManager();
58
59 public:
60   void AddTask(AsyncTaskPtr task);
61   void RemoveTask(AsyncTaskPtr task);
62
63   Dali::AsyncTaskManager::TasksCompletedId SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask);
64   bool                                     RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId);
65
66 public:
67   AsyncTaskPtr PopNextCompletedTask();
68   void         TaskCompleted();
69   void         TaskAllCompleted();
70
71 public: // Worker thread called method
72   AsyncTaskPtr PopNextTaskToProcess();
73   void         CompleteTask(AsyncTaskPtr&& task);
74
75 private: /**
76    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
77    */
78   class TaskHelper
79   {
80   public:
81     /**
82      * @brief Create an TaskHelper.
83      *
84      * @param[in] asyncTaskManager Reference to the AsyncTaskManager
85      */
86     TaskHelper(AsyncTaskManager& asyncTaskManager);
87
88     /**
89      * @brief Request the thread to process the task.
90      * @return True if the request succeeds, otherwise false.
91      */
92     bool Request();
93
94   public:
95     TaskHelper(const TaskHelper&) = delete;
96     TaskHelper& operator=(const TaskHelper&) = delete;
97
98     TaskHelper(TaskHelper&& rhs);
99     TaskHelper& operator=(TaskHelper&& rhs) = delete;
100
101   private:
102     /**
103      * @brief Main constructor that used by all other constructors
104      */
105     TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager);
106
107   private:
108     std::unique_ptr<AsyncTaskThread> mProcessor;
109     AsyncTaskManager&                mAsyncTaskManager;
110   };
111
112   /**
113    * @brief State of running task
114    */
115   enum class RunningTaskState
116   {
117     RUNNING  = 0, ///< Running task
118     CANCELED = 1, ///< Canceled by user
119   };
120
121   /**
122    * @brief State of complete task
123    */
124   enum class CompletedTaskState
125   {
126     REQUIRE_CALLBACK = 0, ///< Need to execute callback when completed task process.
127     SKIP_CALLBACK    = 1, ///< Do not execute callback
128   };
129
130 private:
131   // Undefined
132   AsyncTaskManager(const AsyncTaskManager& manager);
133
134   // Undefined
135   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
136
137 private:
138   // Keep Task as list since we take tasks by FIFO as default.
139   using AsyncTaskContainer = std::list<AsyncTaskPtr>;
140
141   using AsyncRunningTaskPair      = std::pair<AsyncTaskPtr, RunningTaskState>;
142   using AsyncRunningTaskContainer = std::list<AsyncRunningTaskPair>;
143
144   using AsyncCompletedTaskPair      = std::pair<AsyncTaskPtr, CompletedTaskState>;
145   using AsyncCompletedTaskContainer = std::list<AsyncCompletedTaskPair>;
146
147   AsyncTaskContainer          mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
148   AsyncRunningTaskContainer   mRunningTasks;   ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
149   AsyncCompletedTaskContainer mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
150
151   RoundRobinContainerView<TaskHelper> mTasks;
152
153   Dali::Mutex mWaitingTasksMutex;   ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
154   Dali::Mutex mRunningTasksMutex;   ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
155   Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
156
157   std::unique_ptr<EventThreadCallback> mTrigger;
158
159   struct TasksCompletedImpl;
160   std::unique_ptr<TasksCompletedImpl> mTasksCompletedImpl; ///< TaskS completed signal interface for AsyncTaskManager.
161 };
162
163 inline Internal::Adaptor::AsyncTaskManager& GetImplementation(Dali::AsyncTaskManager& obj)
164 {
165   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
166
167   Dali::BaseObject& handle = obj.GetBaseObject();
168
169   return static_cast<Internal::Adaptor::AsyncTaskManager&>(handle);
170 }
171
172 inline const Internal::Adaptor::AsyncTaskManager& GetImplementation(const Dali::AsyncTaskManager& obj)
173 {
174   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
175
176   const Dali::BaseObject& handle = obj.GetBaseObject();
177
178   return static_cast<const Internal::Adaptor::AsyncTaskManager&>(handle);
179 }
180
181 /********************************************************************************/
182 /*********************************  INTERNAL CLASS  *****************************/
183 /********************************************************************************/
184
185 namespace
186 {
187 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
188
189 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
190 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
191
192 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
193 {
194   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
195   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
196   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
197   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
198   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
199 }
200
201 thread_local Dali::AsyncTaskManager gAsyncTaskManager;
202 } // namespace
203
204 /**
205  * The worker thread for async process
206  */
207 class AsyncTaskThread : public Thread
208 {
209 public:
210   /**
211    * Constructor.
212    */
213   AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
214   : mConditionalWait(),
215     mAsyncTaskManager(asyncTaskManager),
216     mThreadId(++gThreadId),
217     mDestroyThread(false),
218     mIsThreadStarted(false),
219     mIsThreadIdle(true)
220   {
221   }
222
223   /**
224    * Destructor.
225    */
226   ~AsyncTaskThread() override
227   {
228     // Stop the thread
229     {
230       ConditionalWait::ScopedLock lock(mConditionalWait);
231       mDestroyThread = true;
232       mConditionalWait.Notify(lock);
233     }
234
235     Join();
236   }
237
238   /**
239    * @brief Request the thread to process the task.
240    * @return True if the request is successed, otherwise false.
241    */
242   bool Request()
243   {
244     if(!mIsThreadStarted)
245     {
246       Start();
247       mIsThreadStarted = true;
248     }
249
250     {
251       // Lock while adding task to the queue
252       ConditionalWait::ScopedLock lock(mConditionalWait);
253
254       if(mIsThreadIdle)
255       {
256         mIsThreadIdle = false;
257
258         // wake up the thread
259         mConditionalWait.Notify(lock);
260         return true;
261       }
262     }
263
264     return false;
265   }
266
267 protected:
268   /**
269    * The entry function of the worker thread.
270    */
271   void Run() override
272   {
273     {
274       char temp[100];
275       snprintf(temp, 100, "AsyncTaskThread[%u]", mThreadId);
276       SetThreadName(temp);
277     }
278
279     while(!mDestroyThread)
280     {
281       AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
282       if(!task)
283       {
284         ConditionalWait::ScopedLock lock(mConditionalWait);
285         if(!mDestroyThread)
286         {
287           mIsThreadIdle = true;
288           mConditionalWait.Wait(lock);
289         }
290       }
291       else
292       {
293         task->Process();
294         if(!mDestroyThread)
295         {
296           mAsyncTaskManager.CompleteTask(std::move(task));
297         }
298       }
299     }
300   }
301
302 private:
303   // Undefined
304   AsyncTaskThread(const AsyncTaskThread& thread) = delete;
305
306   // Undefined
307   AsyncTaskThread& operator=(const AsyncTaskThread& thread) = delete;
308
309 private:
310   ConditionalWait   mConditionalWait;
311   AsyncTaskManager& mAsyncTaskManager;
312   uint32_t          mThreadId;
313   bool              mDestroyThread;
314   bool              mIsThreadStarted;
315   bool              mIsThreadIdle;
316 };
317
318 // AsyncTaskManager::TasksCompletedImpl
319
320 struct AsyncTaskManager::TasksCompletedImpl
321 {
322   TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
323   : mManager(manager),
324     mTrigger(trigger),
325     mEmitCompletedTaskTriggered(false)
326   {
327   }
328
329 public:
330   /**
331    * @brief Create new tasks completed id and.
332    * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
333    * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
334    */
335   Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
336   {
337     // Lock while adding tasks completed callback list to the queue
338     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
339
340     auto id = mTasksCompletedCount++;
341     DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
342
343     mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
344     return id;
345   }
346
347   /**
348    * @brief Append task that will be trace.
349    * @post RemoveTaskTrace should be called.
350    * @param[in] id The id of tasks completed.
351    * @param[in] task The task want to trace.
352    */
353   void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
354   {
355     // Lock while adding tasks completed callback list to the queue
356     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
357
358     auto iter = mTasksCompletedCallbackList.find(id);
359     if(iter == mTasksCompletedCallbackList.end())
360     {
361       // This task is already erased. Ignore.
362       return;
363     }
364
365     auto& callbackData = iter->second;
366
367     auto jter = callbackData.mTasks.find(task.Get());
368
369     if(jter != callbackData.mTasks.end())
370     {
371       // Increase reference count.
372       ++(jter->second);
373     }
374     else
375     {
376       callbackData.mTasks.insert({task.Get(), 1u});
377     }
378   }
379
380   /**
381    * @brief Remove all task that were traced.
382    * @param[in] task The task want to remove trace.
383    * @param[in] taskCount The number of tasks that will be removed.
384    */
385   void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
386   {
387     if(count == 0u)
388     {
389       return;
390     }
391
392     // Lock while removing tasks completed callback list to the queue
393     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
394
395     for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
396     {
397       auto& callbackData      = iter->second;
398       bool  eraseCallbackData = false;
399
400       auto jter = callbackData.mTasks.find(task.Get());
401
402       if(jter != callbackData.mTasks.end())
403       {
404         if(jter->second <= count)
405         {
406           callbackData.mTasks.erase(jter);
407
408           if(callbackData.mTasks.empty())
409           {
410             eraseCallbackData = true;
411
412             // Move callback base into list.
413             // (To avoid task container changed during callback emit)
414             RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
415
416             iter = mTasksCompletedCallbackList.erase(iter);
417           }
418         }
419         else
420         {
421           jter->second -= count;
422         }
423       }
424
425       if(!eraseCallbackData)
426       {
427         ++iter;
428       }
429     }
430   }
431
432   /**
433    * @brief Check whether current TasksCompletedId completed or not.
434    * @param[in] id The id of tasks completed.
435    * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
436    */
437   bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
438   {
439     // Lock while removing tasks completed callback list to the queue
440     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
441
442     auto iter = mTasksCompletedCallbackList.find(id);
443     if(iter != mTasksCompletedCallbackList.end())
444     {
445       auto& callbackData = iter->second;
446       if(callbackData.mTasks.empty())
447       {
448         // Move callback base into list.
449         // (To avoid task container changed during callback emit)
450         RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
451
452         iter = mTasksCompletedCallbackList.erase(iter);
453
454         return true;
455       }
456     }
457
458     return false;
459   }
460
461   /**
462    * @brief Remove taskS completed callbacks by id.
463    * @param[in] id The id of taskS completed.
464    * @return True if taskS completed id removed. False otherwise.
465    */
466   bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
467   {
468     // Lock while removing taskS completed callback list to the queue
469     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
470
471     auto iter = mTasksCompletedCallbackList.find(id);
472     if(iter == mTasksCompletedCallbackList.end())
473     {
474       // This task is already erased, or completed.
475       // Erase from completed excute callback list.
476
477       // Lock while removing excute callback list to the queue
478       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
479
480       for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
481       {
482         if(iter->second == id)
483         {
484           iter = mExcuteCallbackList.erase(iter);
485
486           return true;
487         }
488         else
489         {
490           ++iter;
491         }
492       }
493
494       // This task is alread erased and completed. Ignore.
495       return false;
496     }
497
498     mTasksCompletedCallbackList.erase(iter);
499
500     return true;
501   }
502
503   /**
504    * @brief Emit all completed callbacks.
505    * @note This API should be called at event thread.
506    */
507   void EmitCompletedTasks()
508   {
509     ExecuteCallbackContainer executeCallbackList;
510     {
511       // Lock while removing excute callback list to the queue
512       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
513
514       mEmitCompletedTaskTriggered = false;
515
516       // Copy callback lists, for let we execute callbacks out of mutex
517       executeCallbackList = std::move(mExcuteCallbackList);
518       mExcuteCallbackList.clear();
519     }
520
521     if(!executeCallbackList.empty())
522     {
523       // Execute all callbacks
524       for(auto&& callbackPair : executeCallbackList)
525       {
526         auto& callback = callbackPair.first;
527         auto  id       = callbackPair.second;
528
529         Dali::CallbackBase::Execute(*callback, id);
530       }
531     }
532   }
533
534   /**
535    * @brief Check whether there is some completed signal what we need to trace, or not.
536    * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
537    */
538   bool IsTasksCompletedCallbackExist()
539   {
540     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
541     return !mTasksCompletedCallbackList.empty();
542   }
543
544   /**
545    * @brief Check whether there is some completed signal what we need to execute, or not.
546    * @return True if mExcuteCallbackList is not empty. False otherwise.
547    */
548   bool IsExecuteCallbackExist()
549   {
550     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
551     return !mExcuteCallbackList.empty();
552   }
553
554 private:
555   void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
556   {
557     // Lock while adding excute callback list to the queue
558     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
559
560     mExcuteCallbackList.emplace_back(std::move(callback), id);
561
562     if(!mEmitCompletedTaskTriggered)
563     {
564       mEmitCompletedTaskTriggered = true;
565       mTrigger->Trigger();
566     }
567   }
568
569 private:
570   struct CallbackData
571   {
572   public:
573     CallbackData(CallbackBase* callback)
574     : mCallback(callback),
575       mTasks()
576     {
577     }
578
579     CallbackData(CallbackData&& rhs) noexcept
580     : mCallback(std::move(rhs.mCallback)),
581       mTasks(std::move(rhs.mTasks))
582     {
583     }
584
585     CallbackData& operator=(CallbackData&& rhs) noexcept
586     {
587       if(this != &rhs)
588       {
589         mCallback = std::move(rhs.mCallback);
590         mTasks    = std::move(rhs.mTasks);
591       }
592
593       return *this;
594     }
595
596   private:
597     // Delete copy operator.
598     CallbackData(const CallbackData& rhs) = delete;
599     CallbackData& operator=(const CallbackData& rhs) = delete;
600
601   public:
602     std::unique_ptr<CallbackBase>                  mCallback;
603     std::unordered_map<const AsyncTask*, uint32_t> mTasks;
604   };
605
606 private:
607   AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
608   EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
609
610   Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
611
612   using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
613   TasksCompletedContainer mTasksCompletedCallbackList;
614
615   using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
616   ExecuteCallbackContainer mExcuteCallbackList;
617
618   Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
619   Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
620
621   bool mEmitCompletedTaskTriggered : 1;
622 };
623
624 // AsyncTaskManager
625
626 Dali::AsyncTaskManager AsyncTaskManager::Get()
627 {
628   if(!gAsyncTaskManager)
629   {
630     gAsyncTaskManager = Dali::AsyncTaskManager(new AsyncTaskManager());
631   }
632   return gAsyncTaskManager;
633 }
634
635 AsyncTaskManager::AsyncTaskManager()
636 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
637   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TaskCompleted))),
638   mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get()))
639 {
640 }
641
642 AsyncTaskManager::~AsyncTaskManager()
643 {
644   // Join all threads.
645   mTasks.Clear();
646
647   // Remove task completed impl after all threads are join.
648   mTasksCompletedImpl.reset();
649
650   // Remove tasks
651   mWaitingTasks.clear();
652   mRunningTasks.clear();
653   mCompletedTasks.clear();
654 }
655
656 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
657 {
658   if(task)
659   {
660     // Lock while adding task to the queue
661     Mutex::ScopedLock lock(mWaitingTasksMutex);
662
663     // push back into waiting queue.
664     mWaitingTasks.insert(mWaitingTasks.end(), task);
665
666     {
667       // For thread safety
668       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
669
670       // Finish all Running threads are working
671       if(mRunningTasks.size() >= mTasks.GetElementCount())
672       {
673         return;
674       }
675     }
676   }
677
678   size_t count = mTasks.GetElementCount();
679   size_t index = 0;
680   while(index++ < count)
681   {
682     auto processHelperIt = mTasks.GetNext();
683     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
684     if(processHelperIt->Request())
685     {
686       break;
687     }
688     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
689   }
690 }
691
692 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
693 {
694   if(task)
695   {
696     uint32_t removedCount = 0u;
697
698     {
699       // Lock while remove task from the queue
700       Mutex::ScopedLock lock(mWaitingTasksMutex);
701
702       for(auto iterator = mWaitingTasks.begin(); iterator != mWaitingTasks.end();)
703       {
704         if((*iterator) == task)
705         {
706           iterator = mWaitingTasks.erase(iterator);
707           ++removedCount;
708         }
709         else
710         {
711           ++iterator;
712         }
713       }
714     }
715
716     {
717       // Lock while remove task from the queue
718       Mutex::ScopedLock lock(mRunningTasksMutex);
719
720       for(auto iterator = mRunningTasks.begin(); iterator != mRunningTasks.end();)
721       {
722         if((*iterator).first == task)
723         {
724           // We cannot erase container. Just mark as canceled.
725           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
726           if((*iterator).second == RunningTaskState::RUNNING)
727           {
728             (*iterator).second = RunningTaskState::CANCELED;
729             ++removedCount;
730           }
731         }
732         ++iterator;
733       }
734     }
735
736     {
737       // Lock while remove task from the queue
738       Mutex::ScopedLock lock(mCompletedTasksMutex);
739
740       for(auto iterator = mCompletedTasks.begin(); iterator != mCompletedTasks.end();)
741       {
742         if((*iterator).first == task)
743         {
744           if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
745           {
746             ++removedCount;
747           }
748           iterator = mCompletedTasks.erase(iterator);
749         }
750         else
751         {
752           ++iterator;
753         }
754       }
755     }
756
757     // Remove TasksCompleted callback trace
758     if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
759     {
760       mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
761     }
762   }
763 }
764
765 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
766 {
767   // mTasksCompletedImpl will take ownership of callback.
768   Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
769
770   bool taskAdded = false; ///< Flag whether at least one task tracing now.
771
772   // Please be careful the order of mutex, to avoid dead lock.
773   {
774     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
775     {
776       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
777       {
778         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
779
780         // Collect all tasks from waiting tasks
781         for(auto& task : mWaitingTasks)
782         {
783           auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
784                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
785
786           if((checkMask & mask) == checkMask)
787           {
788             taskAdded = true;
789             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
790           }
791         }
792
793         // Collect all tasks from running tasks
794         for(auto& taskPair : mRunningTasks)
795         {
796           // Trace only if it is running now.
797           if(taskPair.second == RunningTaskState::RUNNING)
798           {
799             auto& task      = taskPair.first;
800             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
801                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
802
803             if((checkMask & mask) == checkMask)
804             {
805               taskAdded = true;
806               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
807             }
808           }
809         }
810
811         // Collect all tasks from complete tasks
812         for(auto& taskPair : mCompletedTasks)
813         {
814           // Trace only if it is need callback.
815           // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
816           //        If worker thread invocation, than it already remove trace at completed timing.
817           //        If canceled cases, we don't append trace at running tasks already.
818           //        So, we don't need to trace for SKIP_CALLBACK cases.
819           if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
820           {
821             auto& task      = taskPair.first;
822             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
823                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
824
825             if((checkMask & mask) == checkMask)
826             {
827               taskAdded = true;
828               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
829             }
830           }
831         }
832       }
833     }
834   }
835
836   // If there is nothing to check task, just excute callback right now.
837   if(!taskAdded)
838   {
839     mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
840   }
841   return tasksCompletedId;
842 }
843
844 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
845 {
846   return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
847 }
848
849 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
850 {
851   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
852
853   AsyncTaskPtr nextCompletedTask = nullptr;
854   {
855     // Lock while popping task out from the queue
856     Mutex::ScopedLock lock(mCompletedTasksMutex);
857
858     while(!mCompletedTasks.empty())
859     {
860       auto               next      = mCompletedTasks.begin();
861       AsyncTaskPtr       nextTask  = next->first;
862       CompletedTaskState taskState = next->second;
863       mCompletedTasks.erase(next);
864
865       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
866       {
867         nextCompletedTask = nextTask;
868         break;
869       }
870
871       ignoredTaskList.push_back(nextTask);
872     }
873   }
874
875   return nextCompletedTask;
876 }
877
878 void AsyncTaskManager::TaskCompleted()
879 {
880   // For UTC, let we complete only 1 task here.
881   if(AsyncTaskPtr task = PopNextCompletedTask())
882   {
883     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
884
885     // Remove TasksCompleted callback trace
886     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
887     {
888       mTasksCompletedImpl->RemoveTaskTrace(task);
889     }
890   }
891
892   mTasksCompletedImpl->EmitCompletedTasks();
893 }
894
895 void AsyncTaskManager::TaskAllCompleted()
896 {
897   while(AsyncTaskPtr task = PopNextCompletedTask())
898   {
899     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
900
901     // Remove TasksCompleted callback trace
902     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
903     {
904       mTasksCompletedImpl->RemoveTaskTrace(task);
905     }
906   }
907
908   mTasksCompletedImpl->EmitCompletedTasks();
909 }
910
911 /// Worker thread called
912 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
913 {
914   // Lock while popping task out from the queue
915   Mutex::ScopedLock lock(mWaitingTasksMutex);
916
917   // pop out the next task from the queue
918   AsyncTaskPtr nextTask = nullptr;
919
920   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
921   {
922     if((*iter)->IsReady())
923     {
924       nextTask = *iter;
925
926       // Add Running queue
927       {
928         // Lock while popping task out from the queue
929         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
930
931         mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
932
933         mWaitingTasks.erase(iter);
934       }
935       break;
936     }
937   }
938
939   return nextTask;
940 }
941
942 /// Worker thread called
943 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
944 {
945   bool notify = false;
946
947   if(task)
948   {
949     bool needTrigger = false;
950
951     // Lock while check validation of task.
952     {
953       Mutex::ScopedLock lock(mRunningTasksMutex);
954
955       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
956       if(iter != mRunningTasks.end())
957       {
958         if(iter->second == RunningTaskState::RUNNING)
959         {
960           // This task is valid.
961           notify = true;
962         }
963       }
964     }
965
966     // We should execute this tasks complete callback out of mutex
967     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
968     {
969       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
970
971       // We need to remove task trace now.
972       if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
973       {
974         mTasksCompletedImpl->RemoveTaskTrace(task);
975
976         if(mTasksCompletedImpl->IsExecuteCallbackExist())
977         {
978           // We need to call EmitCompletedTasks(). Trigger main thread.
979           needTrigger = true;
980         }
981       }
982     }
983
984     // Lock while adding task to the queue
985     {
986       Mutex::ScopedLock lock(mRunningTasksMutex);
987
988       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
989       if(iter != mRunningTasks.end())
990       {
991         // Move task into completed, for ensure that AsyncTask destroy at main thread.
992         {
993           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
994
995           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
996
997           needTrigger |= callbackRequired;
998
999           mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1000
1001           mRunningTasks.erase(iter);
1002
1003           if(!needTrigger)
1004           {
1005             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1006           }
1007
1008           // Now, task is invalidate.
1009           task.Reset();
1010         }
1011       }
1012     }
1013
1014     // Wake up the main thread
1015     if(needTrigger)
1016     {
1017       mTrigger->Trigger();
1018     }
1019   }
1020 }
1021
1022 // AsyncTaskManager::TaskHelper
1023
1024 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1025 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1026 {
1027 }
1028
1029 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1030 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1031 {
1032 }
1033
1034 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1035 : mProcessor(std::move(processor)),
1036   mAsyncTaskManager(asyncTaskManager)
1037 {
1038 }
1039
1040 bool AsyncTaskManager::TaskHelper::Request()
1041 {
1042   return mProcessor->Request();
1043 }
1044
1045 } // namespace Adaptor
1046
1047 } // namespace Internal
1048
1049 /********************************************************************************/
1050 /*********************************  PUBLIC CLASS  *******************************/
1051 /********************************************************************************/
1052
1053 AsyncTaskManager::AsyncTaskManager() = default;
1054
1055 AsyncTaskManager::~AsyncTaskManager() = default;
1056
1057 AsyncTaskManager AsyncTaskManager::Get()
1058 {
1059   return Internal::Adaptor::AsyncTaskManager::Get();
1060 }
1061
1062 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
1063 {
1064   Internal::Adaptor::GetImplementation(*this).AddTask(task);
1065 }
1066
1067 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
1068 {
1069   Internal::Adaptor::GetImplementation(*this).RemoveTask(task);
1070 }
1071
1072 AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, AsyncTaskManager::CompletedCallbackTraceMask mask)
1073 {
1074   return Internal::Adaptor::GetImplementation(*this).SetCompletedCallback(callback, mask);
1075 }
1076
1077 bool AsyncTaskManager::RemoveCompletedCallback(AsyncTaskManager::TasksCompletedId tasksCompletedId)
1078 {
1079   return Internal::Adaptor::GetImplementation(*this).RemoveCompletedCallback(tasksCompletedId);
1080 }
1081
1082 AsyncTaskManager::AsyncTaskManager(Internal::Adaptor::AsyncTaskManager* impl)
1083 : BaseHandle(impl)
1084 {
1085 }
1086
1087 } // namespace Dali
1088
1089 namespace Test
1090 {
1091 namespace AsyncTaskManager
1092 {
1093 void DestroyAsyncTaskManager()
1094 {
1095   Dali::Internal::Adaptor::gAsyncTaskManager.Reset();
1096 }
1097
1098 void ProcessSingleCompletedTask()
1099 {
1100   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1101   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskCompleted();
1102 }
1103
1104 void ProcessAllCompletedTask()
1105 {
1106   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1107   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskAllCompleted();
1108 }
1109 } // namespace AsyncTaskManager
1110 } // namespace Test