Merge "Remove Atlas parameter for TextureManager cache system" into devel/master
[platform/core/uifw/dali-toolkit.git] / automated-tests / src / dali-toolkit / dali-toolkit-test-utils / toolkit-async-task-manager.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <toolkit-async-task-manager.h>
20
21 // EXTERNAL INCLUDE
22 #include <dali/devel-api/adaptor-framework/thread-settings.h>
23 #include <dali/devel-api/threading/conditional-wait.h>
24 #include <dali/devel-api/threading/mutex.h>
25 #include <dali/devel-api/threading/thread.h>
26 #include <dali/public-api/adaptor-framework/async-task-manager.h>
27 #include <dali/public-api/adaptor-framework/round-robin-container-view.h>
28 #include <dali/public-api/common/list-wrapper.h>
29 #include <dali/public-api/object/base-object.h>
30 #include <memory>
31 #include <unordered_map>
32
33 // INTERNAL INCLUDE
34 #include <toolkit-application.h>
35 #include <toolkit-environment-variable.h>
36 #include <toolkit-event-thread-callback.h>
37
38 namespace Dali
39 {
40 namespace Internal
41 {
42 namespace Adaptor
43 {
44 namespace
45 {
46 std::atomic_uint32_t gThreadId = 0u;
47 }
48
49 class AsyncTaskThread;
50
51 class AsyncTaskManager : public Dali::BaseObject
52 {
53 public:
54   static Dali::AsyncTaskManager Get();
55
56   AsyncTaskManager();
57   ~AsyncTaskManager();
58
59 public:
60   void AddTask(AsyncTaskPtr task);
61   void RemoveTask(AsyncTaskPtr task);
62
63   Dali::AsyncTaskManager::TasksCompletedId SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask);
64   bool                                     RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId);
65
66 public:
67   AsyncTaskPtr PopNextCompletedTask();
68   void         TaskCompleted();
69   void         TaskAllCompleted();
70
71 public: // Worker thread called method
72   AsyncTaskPtr PopNextTaskToProcess();
73   void         CompleteTask(AsyncTaskPtr&& task);
74
75 private: /**
76    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
77    */
78   class TaskHelper
79   {
80   public:
81     /**
82      * @brief Create an TaskHelper.
83      *
84      * @param[in] asyncTaskManager Reference to the AsyncTaskManager
85      */
86     TaskHelper(AsyncTaskManager& asyncTaskManager);
87
88     /**
89      * @brief Request the thread to process the task.
90      * @return True if the request succeeds, otherwise false.
91      */
92     bool Request();
93
94   public:
95     TaskHelper(const TaskHelper&) = delete;
96     TaskHelper& operator=(const TaskHelper&) = delete;
97
98     TaskHelper(TaskHelper&& rhs);
99     TaskHelper& operator=(TaskHelper&& rhs) = delete;
100
101   private:
102     /**
103      * @brief Main constructor that used by all other constructors
104      */
105     TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager);
106
107   private:
108     std::unique_ptr<AsyncTaskThread> mProcessor;
109     AsyncTaskManager&                mAsyncTaskManager;
110   };
111
112   /**
113    * @brief State of running task
114    */
115   enum class RunningTaskState
116   {
117     RUNNING  = 0, ///< Running task
118     CANCELED = 1, ///< Canceled by user
119   };
120
121   /**
122    * @brief State of complete task
123    */
124   enum class CompletedTaskState
125   {
126     REQUIRE_CALLBACK = 0, ///< Need to execute callback when completed task process.
127     SKIP_CALLBACK    = 1, ///< Do not execute callback
128   };
129
130 private:
131   // Undefined
132   AsyncTaskManager(const AsyncTaskManager& manager);
133
134   // Undefined
135   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
136
137 private:
138   // Keep Task as list since we take tasks by FIFO as default.
139   using AsyncTaskContainer = std::list<AsyncTaskPtr>;
140
141   using AsyncRunningTaskPair      = std::pair<AsyncTaskPtr, RunningTaskState>;
142   using AsyncRunningTaskContainer = std::list<AsyncRunningTaskPair>;
143
144   using AsyncCompletedTaskPair      = std::pair<AsyncTaskPtr, CompletedTaskState>;
145   using AsyncCompletedTaskContainer = std::list<AsyncCompletedTaskPair>;
146
147   AsyncTaskContainer          mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
148   AsyncRunningTaskContainer   mRunningTasks;   ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
149   AsyncCompletedTaskContainer mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
150
151   RoundRobinContainerView<TaskHelper> mTasks;
152
153   Dali::Mutex mWaitingTasksMutex;   ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
154   Dali::Mutex mRunningTasksMutex;   ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
155   Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
156
157   std::unique_ptr<EventThreadCallback> mTrigger;
158
159   struct TasksCompletedImpl;
160   std::unique_ptr<TasksCompletedImpl> mTasksCompletedImpl; ///< TaskS completed signal interface for AsyncTaskManager.
161 };
162
163 inline Internal::Adaptor::AsyncTaskManager& GetImplementation(Dali::AsyncTaskManager& obj)
164 {
165   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
166
167   Dali::BaseObject& handle = obj.GetBaseObject();
168
169   return static_cast<Internal::Adaptor::AsyncTaskManager&>(handle);
170 }
171
172 inline const Internal::Adaptor::AsyncTaskManager& GetImplementation(const Dali::AsyncTaskManager& obj)
173 {
174   DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
175
176   const Dali::BaseObject& handle = obj.GetBaseObject();
177
178   return static_cast<const Internal::Adaptor::AsyncTaskManager&>(handle);
179 }
180
181 /********************************************************************************/
182 /*********************************  INTERNAL CLASS  *****************************/
183 /********************************************************************************/
184
185 namespace
186 {
187 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
188
189 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
190 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
191
192 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
193 {
194   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
195   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
196   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
197   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
198   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
199 }
200
201 thread_local Dali::AsyncTaskManager gAsyncTaskManager;
202 } // namespace
203
204 /**
205  * The worker thread for async process
206  */
207 class AsyncTaskThread : public Thread
208 {
209 public:
210   /**
211    * Constructor.
212    */
213   AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
214   : mConditionalWait(),
215     mAsyncTaskManager(asyncTaskManager),
216     mThreadId(++gThreadId),
217     mDestroyThread(false),
218     mIsThreadStarted(false),
219     mIsThreadIdle(true)
220   {
221   }
222
223   /**
224    * Destructor.
225    */
226   ~AsyncTaskThread() override
227   {
228     // Stop the thread
229     {
230       ConditionalWait::ScopedLock lock(mConditionalWait);
231       mDestroyThread = true;
232       mConditionalWait.Notify(lock);
233     }
234
235     Join();
236   }
237
238   /**
239    * @brief Request the thread to process the task.
240    * @return True if the request is successed, otherwise false.
241    */
242   bool Request()
243   {
244     if(!mIsThreadStarted)
245     {
246       Start();
247       mIsThreadStarted = true;
248     }
249
250     {
251       // Lock while adding task to the queue
252       ConditionalWait::ScopedLock lock(mConditionalWait);
253
254       if(mIsThreadIdle)
255       {
256         mIsThreadIdle = false;
257
258         // wake up the thread
259         mConditionalWait.Notify(lock);
260         return true;
261       }
262     }
263
264     return false;
265   }
266
267 protected:
268   /**
269    * The entry function of the worker thread.
270    */
271   void Run() override
272   {
273     {
274       char temp[100];
275       snprintf(temp, 100, "AsyncTaskThread[%u]", mThreadId);
276       SetThreadName(temp);
277     }
278
279     while(!mDestroyThread)
280     {
281       AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
282       if(!task)
283       {
284         ConditionalWait::ScopedLock lock(mConditionalWait);
285         if(!mDestroyThread)
286         {
287           mIsThreadIdle = true;
288           mConditionalWait.Wait(lock);
289         }
290       }
291       else
292       {
293         task->Process();
294         if(!mDestroyThread)
295         {
296           mAsyncTaskManager.CompleteTask(std::move(task));
297         }
298       }
299     }
300   }
301
302 private:
303   // Undefined
304   AsyncTaskThread(const AsyncTaskThread& thread) = delete;
305
306   // Undefined
307   AsyncTaskThread& operator=(const AsyncTaskThread& thread) = delete;
308
309 private:
310   ConditionalWait   mConditionalWait;
311   AsyncTaskManager& mAsyncTaskManager;
312   uint32_t          mThreadId;
313   bool              mDestroyThread;
314   bool              mIsThreadStarted;
315   bool              mIsThreadIdle;
316 };
317
318 // AsyncTaskManager::TasksCompletedImpl
319
320 struct AsyncTaskManager::TasksCompletedImpl
321 {
322   TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
323   : mManager(manager),
324     mTrigger(trigger),
325     mEmitCompletedTaskTriggered(false)
326   {
327   }
328
329 public:
330   /**
331    * @brief Create new tasks completed id and.
332    * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
333    * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
334    */
335   Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
336   {
337     // Lock while adding tasks completed callback list to the queue
338     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
339
340     auto id = mTasksCompletedCount++;
341     DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
342
343     mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
344     return id;
345   }
346
347   /**
348    * @brief Append task that will be trace.
349    * @post RemoveTaskTrace should be called.
350    * @param[in] id The id of tasks completed.
351    * @param[in] task The task want to trace.
352    */
353   void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
354   {
355     // Lock while adding tasks completed callback list to the queue
356     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
357
358     auto iter = mTasksCompletedCallbackList.find(id);
359     if(iter == mTasksCompletedCallbackList.end())
360     {
361       // This task is already erased. Ignore.
362       return;
363     }
364
365     auto& callbackData = iter->second;
366
367     auto jter = callbackData.mTasks.find(task.Get());
368
369     if(jter != callbackData.mTasks.end())
370     {
371       // Increase reference count.
372       ++(jter->second);
373     }
374     else
375     {
376       callbackData.mTasks.insert({task.Get(), 1u});
377     }
378   }
379
380   /**
381    * @brief Remove all task that were traced.
382    * @param[in] task The task want to remove trace.
383    * @param[in] taskCount The number of tasks that will be removed.
384    */
385   void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
386   {
387     if(count == 0u)
388     {
389       return;
390     }
391
392     // Lock while removing tasks completed callback list to the queue
393     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
394
395     for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
396     {
397       auto& callbackData      = iter->second;
398       bool  eraseCallbackData = false;
399
400       auto jter = callbackData.mTasks.find(task.Get());
401
402       if(jter != callbackData.mTasks.end())
403       {
404         if(jter->second <= count)
405         {
406           callbackData.mTasks.erase(jter);
407
408           if(callbackData.mTasks.empty())
409           {
410             eraseCallbackData = true;
411
412             // Move callback base into list.
413             // (To avoid task container changed during callback emit)
414             RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
415
416             iter = mTasksCompletedCallbackList.erase(iter);
417           }
418         }
419         else
420         {
421           jter->second -= count;
422         }
423       }
424
425       if(!eraseCallbackData)
426       {
427         ++iter;
428       }
429     }
430   }
431
432   /**
433    * @brief Check whether current TasksCompletedId completed or not.
434    * @param[in] id The id of tasks completed.
435    * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
436    */
437   bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
438   {
439     // Lock while removing tasks completed callback list to the queue
440     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
441
442     auto iter = mTasksCompletedCallbackList.find(id);
443     if(iter != mTasksCompletedCallbackList.end())
444     {
445       auto& callbackData = iter->second;
446       if(callbackData.mTasks.empty())
447       {
448         // Move callback base into list.
449         // (To avoid task container changed during callback emit)
450         RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
451
452         iter = mTasksCompletedCallbackList.erase(iter);
453
454         return true;
455       }
456     }
457
458     return false;
459   }
460
461   /**
462    * @brief Remove taskS completed callbacks by id.
463    * @param[in] id The id of taskS completed.
464    * @return True if taskS completed id removed. False otherwise.
465    */
466   bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
467   {
468     // Lock while removing taskS completed callback list to the queue
469     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
470
471     auto iter = mTasksCompletedCallbackList.find(id);
472     if(iter == mTasksCompletedCallbackList.end())
473     {
474       // This task is already erased, or completed.
475       // Erase from completed excute callback list.
476
477       // Lock while removing excute callback list to the queue
478       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
479
480       for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
481       {
482         if(iter->second == id)
483         {
484           iter = mExcuteCallbackList.erase(iter);
485
486           return true;
487         }
488         else
489         {
490           ++iter;
491         }
492       }
493
494       // This task is alread erased and completed. Ignore.
495       return false;
496     }
497
498     mTasksCompletedCallbackList.erase(iter);
499
500     return true;
501   }
502
503   /**
504    * @brief Emit all completed callbacks.
505    * @note This API should be called at event thread.
506    */
507   void EmitCompletedTasks()
508   {
509     ExecuteCallbackContainer executeCallbackList;
510     {
511       // Lock while removing excute callback list to the queue
512       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
513
514       mEmitCompletedTaskTriggered = false;
515
516       // Copy callback lists, for let we execute callbacks out of mutex
517       executeCallbackList = std::move(mExcuteCallbackList);
518       mExcuteCallbackList.clear();
519     }
520
521     if(!executeCallbackList.empty())
522     {
523       // Execute all callbacks
524       for(auto&& callbackPair : executeCallbackList)
525       {
526         auto& callback = callbackPair.first;
527         auto  id       = callbackPair.second;
528
529         Dali::CallbackBase::Execute(*callback, id);
530       }
531     }
532   }
533
534   /**
535    * @brief Check whether there is some completed signal what we need to trace, or not.
536    * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
537    */
538   bool IsTasksCompletedCallbackExist()
539   {
540     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
541     return !mTasksCompletedCallbackList.empty();
542   }
543
544   /**
545    * @brief Check whether there is some completed signal what we need to execute, or not.
546    * @return True if mExcuteCallbackList is not empty. False otherwise.
547    */
548   bool IsExecuteCallbackExist()
549   {
550     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
551     return !mExcuteCallbackList.empty();
552   }
553
554 private:
555   void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
556   {
557     // Lock while adding excute callback list to the queue
558     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
559
560     mExcuteCallbackList.emplace_back(std::move(callback), id);
561
562     if(!mEmitCompletedTaskTriggered)
563     {
564       mEmitCompletedTaskTriggered = true;
565       mTrigger->Trigger();
566     }
567   }
568
569 private:
570   struct CallbackData
571   {
572   public:
573     CallbackData(CallbackBase* callback)
574     : mCallback(callback),
575       mTasks()
576     {
577     }
578
579     CallbackData(CallbackData&& rhs) noexcept
580     : mCallback(std::move(rhs.mCallback)),
581       mTasks(std::move(rhs.mTasks))
582     {
583     }
584
585     CallbackData& operator=(CallbackData&& rhs) noexcept
586     {
587       if(this != &rhs)
588       {
589         mCallback = std::move(rhs.mCallback);
590         mTasks    = std::move(rhs.mTasks);
591       }
592
593       return *this;
594     }
595
596   private:
597     // Delete copy operator.
598     CallbackData(const CallbackData& rhs) = delete;
599     CallbackData& operator=(const CallbackData& rhs) = delete;
600
601   public:
602     std::unique_ptr<CallbackBase>                  mCallback;
603     std::unordered_map<const AsyncTask*, uint32_t> mTasks;
604   };
605
606 private:
607   AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
608   EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
609
610   Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
611
612   using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
613   TasksCompletedContainer mTasksCompletedCallbackList;
614
615   using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
616   ExecuteCallbackContainer mExcuteCallbackList;
617
618   Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
619   Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
620
621   bool mEmitCompletedTaskTriggered : 1;
622 };
623
624 // AsyncTaskManager
625
626 Dali::AsyncTaskManager AsyncTaskManager::Get()
627 {
628   if(!gAsyncTaskManager)
629   {
630     gAsyncTaskManager = Dali::AsyncTaskManager(new AsyncTaskManager());
631   }
632   return gAsyncTaskManager;
633 }
634
635 AsyncTaskManager::AsyncTaskManager()
636 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
637   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TaskCompleted))),
638   mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get()))
639 {
640 }
641
642 AsyncTaskManager::~AsyncTaskManager()
643 {
644   // Join all threads.
645   mTasks.Clear();
646
647   // Remove task completed impl after all threads are join.
648   mTasksCompletedImpl.reset();
649
650   // Remove tasks
651   mWaitingTasks.clear();
652   mRunningTasks.clear();
653   mCompletedTasks.clear();
654 }
655
656 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
657 {
658   if(task)
659   {
660     // Lock while adding task to the queue
661     Mutex::ScopedLock lock(mWaitingTasksMutex);
662
663     // push back into waiting queue.
664     mWaitingTasks.insert(mWaitingTasks.end(), task);
665
666     {
667       // For thread safety
668       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
669
670       // Finish all Running threads are working
671       if(mRunningTasks.size() >= mTasks.GetElementCount())
672       {
673         return;
674       }
675     }
676   }
677
678   size_t count = mTasks.GetElementCount();
679   size_t index = 0;
680   while(index++ < count)
681   {
682     auto processHelperIt = mTasks.GetNext();
683     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
684     if(processHelperIt->Request())
685     {
686       break;
687     }
688     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
689   }
690 }
691
692 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
693 {
694   if(task)
695   {
696     uint32_t removedCount = 0u;
697
698     {
699       // Lock while remove task from the queue
700       Mutex::ScopedLock lock(mWaitingTasksMutex);
701
702       for(auto iterator = mWaitingTasks.begin(); iterator != mWaitingTasks.end();)
703       {
704         if((*iterator) == task)
705         {
706           iterator = mWaitingTasks.erase(iterator);
707           ++removedCount;
708         }
709         else
710         {
711           ++iterator;
712         }
713       }
714     }
715
716     {
717       // Lock while remove task from the queue
718       Mutex::ScopedLock lock(mRunningTasksMutex);
719
720       for(auto iterator = mRunningTasks.begin(); iterator != mRunningTasks.end();)
721       {
722         if((*iterator).first == task)
723         {
724           // We cannot erase container. Just mark as canceled.
725           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
726           (*iterator).second = RunningTaskState::CANCELED;
727           ++removedCount;
728         }
729         ++iterator;
730       }
731     }
732
733     {
734       // Lock while remove task from the queue
735       Mutex::ScopedLock lock(mCompletedTasksMutex);
736
737       for(auto iterator = mCompletedTasks.begin(); iterator != mCompletedTasks.end();)
738       {
739         if((*iterator).first == task)
740         {
741           iterator = mCompletedTasks.erase(iterator);
742           ++removedCount;
743         }
744         else
745         {
746           ++iterator;
747         }
748       }
749     }
750
751     // Remove TasksCompleted callback trace
752     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist() && removedCount > 0u)
753     {
754       mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
755     }
756   }
757 }
758
759 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
760 {
761   // mTasksCompletedImpl will take ownership of callback.
762   Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
763
764   bool taskAdded = false; ///< Flag whether at least one task tracing now.
765
766   // Please be careful the order of mutex, to avoid dead lock.
767   {
768     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
769     {
770       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
771       {
772         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
773
774         // Collect all tasks from waiting tasks
775         for(auto& task : mWaitingTasks)
776         {
777           auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
778                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
779
780           if((checkMask & mask) == checkMask)
781           {
782             taskAdded = true;
783             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
784           }
785         }
786
787         // Collect all tasks from running tasks
788         for(auto& taskPair : mRunningTasks)
789         {
790           auto& task      = taskPair.first;
791           auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
792                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
793
794           if((checkMask & mask) == checkMask)
795           {
796             taskAdded = true;
797             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
798           }
799         }
800
801         // Collect all tasks from complete tasks
802         for(auto& taskPair : mCompletedTasks)
803         {
804           auto& task      = taskPair.first;
805           auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
806                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
807
808           if((checkMask & mask) == checkMask)
809           {
810             taskAdded = true;
811             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
812           }
813         }
814       }
815     }
816   }
817
818   // If there is nothing to check task, just excute callback right now.
819   if(!taskAdded)
820   {
821     mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
822   }
823   return tasksCompletedId;
824 }
825
826 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
827 {
828   return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
829 }
830
831 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
832 {
833   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
834
835   AsyncTaskPtr nextCompletedTask = nullptr;
836   {
837     // Lock while popping task out from the queue
838     Mutex::ScopedLock lock(mCompletedTasksMutex);
839
840     while(!mCompletedTasks.empty())
841     {
842       auto               next      = mCompletedTasks.begin();
843       AsyncTaskPtr       nextTask  = next->first;
844       CompletedTaskState taskState = next->second;
845       mCompletedTasks.erase(next);
846
847       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
848       {
849         nextCompletedTask = nextTask;
850         break;
851       }
852
853       ignoredTaskList.push_back(nextTask);
854     }
855   }
856
857   return nextCompletedTask;
858 }
859
860 void AsyncTaskManager::TaskCompleted()
861 {
862   // For UTC, let we complete only 1 task here.
863   if(AsyncTaskPtr task = PopNextCompletedTask())
864   {
865     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
866
867     // Remove TasksCompleted callback trace
868     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
869     {
870       mTasksCompletedImpl->RemoveTaskTrace(task);
871     }
872   }
873
874   mTasksCompletedImpl->EmitCompletedTasks();
875 }
876
877 void AsyncTaskManager::TaskAllCompleted()
878 {
879   while(AsyncTaskPtr task = PopNextCompletedTask())
880   {
881     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
882
883     // Remove TasksCompleted callback trace
884     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
885     {
886       mTasksCompletedImpl->RemoveTaskTrace(task);
887     }
888   }
889
890   mTasksCompletedImpl->EmitCompletedTasks();
891 }
892
893 /// Worker thread called
894 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
895 {
896   // Lock while popping task out from the queue
897   Mutex::ScopedLock lock(mWaitingTasksMutex);
898
899   // pop out the next task from the queue
900   AsyncTaskPtr nextTask = nullptr;
901
902   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
903   {
904     if((*iter)->IsReady())
905     {
906       nextTask = *iter;
907
908       // Add Running queue
909       {
910         // Lock while popping task out from the queue
911         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
912
913         mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
914
915         mWaitingTasks.erase(iter);
916       }
917       break;
918     }
919   }
920
921   return nextTask;
922 }
923
924 /// Worker thread called
925 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
926 {
927   bool notify = false;
928
929   if(task)
930   {
931     bool needTrigger = false;
932
933     // Lock while check validation of task.
934     {
935       Mutex::ScopedLock lock(mRunningTasksMutex);
936
937       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
938       if(iter != mRunningTasks.end())
939       {
940         if(iter->second == RunningTaskState::RUNNING)
941         {
942           // This task is valid.
943           notify = true;
944         }
945       }
946     }
947
948     // We should execute this tasks complete callback out of mutex
949     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
950     {
951       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
952
953       // We need to remove task trace now.
954       if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
955       {
956         mTasksCompletedImpl->RemoveTaskTrace(task);
957
958         if(mTasksCompletedImpl->IsExecuteCallbackExist())
959         {
960           // We need to call EmitCompletedTasks(). Trigger main thread.
961           needTrigger = true;
962         }
963       }
964     }
965
966     // Lock while adding task to the queue
967     {
968       Mutex::ScopedLock lock(mRunningTasksMutex);
969
970       auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
971       if(iter != mRunningTasks.end())
972       {
973         // Move task into completed, for ensure that AsyncTask destroy at main thread.
974         {
975           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
976
977           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
978
979           needTrigger |= callbackRequired;
980
981           mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
982
983           mRunningTasks.erase(iter);
984
985           if(!needTrigger)
986           {
987             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
988           }
989
990           // Now, task is invalidate.
991           task.Reset();
992         }
993       }
994     }
995
996     // Wake up the main thread
997     if(needTrigger)
998     {
999       mTrigger->Trigger();
1000     }
1001   }
1002 }
1003
1004 // AsyncTaskManager::TaskHelper
1005
1006 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1007 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1008 {
1009 }
1010
1011 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1012 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1013 {
1014 }
1015
1016 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1017 : mProcessor(std::move(processor)),
1018   mAsyncTaskManager(asyncTaskManager)
1019 {
1020 }
1021
1022 bool AsyncTaskManager::TaskHelper::Request()
1023 {
1024   return mProcessor->Request();
1025 }
1026
1027 } // namespace Adaptor
1028
1029 } // namespace Internal
1030
1031 /********************************************************************************/
1032 /*********************************  PUBLIC CLASS  *******************************/
1033 /********************************************************************************/
1034
1035 AsyncTaskManager::AsyncTaskManager() = default;
1036
1037 AsyncTaskManager::~AsyncTaskManager() = default;
1038
1039 AsyncTaskManager AsyncTaskManager::Get()
1040 {
1041   return Internal::Adaptor::AsyncTaskManager::Get();
1042 }
1043
1044 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
1045 {
1046   Internal::Adaptor::GetImplementation(*this).AddTask(task);
1047 }
1048
1049 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
1050 {
1051   Internal::Adaptor::GetImplementation(*this).RemoveTask(task);
1052 }
1053
1054 AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, AsyncTaskManager::CompletedCallbackTraceMask mask)
1055 {
1056   return Internal::Adaptor::GetImplementation(*this).SetCompletedCallback(callback, mask);
1057 }
1058
1059 bool AsyncTaskManager::RemoveCompletedCallback(AsyncTaskManager::TasksCompletedId tasksCompletedId)
1060 {
1061   return Internal::Adaptor::GetImplementation(*this).RemoveCompletedCallback(tasksCompletedId);
1062 }
1063
1064 AsyncTaskManager::AsyncTaskManager(Internal::Adaptor::AsyncTaskManager* impl)
1065 : BaseHandle(impl)
1066 {
1067 }
1068
1069 } // namespace Dali
1070
1071 namespace Test
1072 {
1073 namespace AsyncTaskManager
1074 {
1075 void DestroyAsyncTaskManager()
1076 {
1077   Dali::Internal::Adaptor::gAsyncTaskManager.Reset();
1078 }
1079
1080 void ProcessSingleCompletedTask()
1081 {
1082   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1083   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskCompleted();
1084 }
1085
1086 void ProcessAllCompletedTask()
1087 {
1088   auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1089   Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskAllCompleted();
1090 }
1091 } // namespace AsyncTaskManager
1092 } // namespace Test