Add TasksCompleted signal at AsyncTaskManager
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 #include <unordered_map>
29
30 namespace Dali
31 {
32 namespace Internal
33 {
34 namespace Adaptor
35 {
36 namespace
37 {
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
39
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
42
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
46
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
48 {
49   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
54 }
55
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
57 {
58   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
62 }
63
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
66
67 uint32_t gThreadId = 0u; // Only for debug
68 #endif
69
70 } // unnamed namespace
71
72 // AsyncTaskThread
73
74 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
75 : mConditionalWait(),
76   mAsyncTaskManager(asyncTaskManager),
77   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
78   mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
79   mDestroyThread(false),
80   mIsThreadStarted(false),
81   mIsThreadIdle(true)
82 {
83 }
84
85 AsyncTaskThread::~AsyncTaskThread()
86 {
87   // Stop the thread
88   {
89     ConditionalWait::ScopedLock lock(mConditionalWait);
90     mDestroyThread = true;
91     mConditionalWait.Notify(lock);
92   }
93
94   Join();
95 }
96
97 bool AsyncTaskThread::Request()
98 {
99   if(!mIsThreadStarted)
100   {
101     Start();
102     mIsThreadStarted = true;
103   }
104
105   {
106     // Lock while adding task to the queue
107     ConditionalWait::ScopedLock lock(mConditionalWait);
108
109     if(mIsThreadIdle)
110     {
111       mIsThreadIdle = false;
112
113       // wake up the thread
114       mConditionalWait.Notify(lock);
115       return true;
116     }
117   }
118
119   return false;
120 }
121
122 void AsyncTaskThread::Run()
123 {
124 #if defined(DEBUG_ENABLED)
125   uint32_t threadId = gThreadId++;
126   {
127     char temp[100];
128     snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
129     SetThreadName(temp);
130   }
131 #else
132   SetThreadName("AsyncTaskThread");
133 #endif
134   mLogFactory.InstallLogFunction();
135   mTraceFactory.InstallTraceFunction();
136
137   while(!mDestroyThread)
138   {
139     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
140     if(!task)
141     {
142       ConditionalWait::ScopedLock lock(mConditionalWait);
143       if(!mDestroyThread)
144       {
145         mIsThreadIdle = true;
146         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
147         mConditionalWait.Wait(lock);
148         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
149       }
150     }
151     else
152     {
153       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
154       task->Process();
155       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
156       if(!mDestroyThread)
157       {
158         mAsyncTaskManager.CompleteTask(std::move(task));
159       }
160     }
161   }
162 }
163
164 // AsyncTaskManager::TasksCompletedImpl
165
166 struct AsyncTaskManager::TasksCompletedImpl
167 {
168   TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
169   : mManager(manager),
170     mTrigger(trigger),
171     mEmitCompletedTaskTriggered(false)
172   {
173   }
174
175 public:
176   /**
177    * @brief Create new tasks completed id and.
178    * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
179    * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
180    */
181   Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
182   {
183     // Lock while adding tasks completed callback list to the queue
184     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
185
186     auto id = mTasksCompletedCount++;
187     DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
188
189     mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
190
191     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
192     return id;
193   }
194
195   /**
196    * @brief Append task that will be trace.
197    * @post RemoveTaskTrace should be called.
198    * @param[in] id The id of tasks completed.
199    * @param[in] task The task want to trace.
200    */
201   void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
202   {
203     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p]\n", id, task.Get());
204
205     // Lock while adding tasks completed callback list to the queue
206     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
207
208     auto iter = mTasksCompletedCallbackList.find(id);
209     if(iter == mTasksCompletedCallbackList.end())
210     {
211       // This task is already erased. Ignore.
212       return;
213     }
214
215     auto& callbackData = iter->second;
216
217     auto jter = callbackData.mTasks.find(task.Get());
218
219     if(jter != callbackData.mTasks.end())
220     {
221       // Increase reference count.
222       ++(jter->second);
223     }
224     else
225     {
226       callbackData.mTasks.insert({task.Get(), 1u});
227     }
228   }
229
230   /**
231    * @brief Remove all task that were traced.
232    * @param[in] task The task want to remove trace.
233    * @param[in] taskCount The number of tasks that will be removed.
234    */
235   void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
236   {
237     if(count == 0u)
238     {
239       return;
240     }
241     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p] remove count[%u]\n", task.Get(), count);
242
243     // Lock while removing tasks completed callback list to the queue
244     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
245
246     for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
247     {
248       auto& callbackData      = iter->second;
249       bool  eraseCallbackData = false;
250
251       auto jter = callbackData.mTasks.find(task.Get());
252
253       if(jter != callbackData.mTasks.end())
254       {
255         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p], current refcount[%u]\n", iter->first, task.Get(), (jter->second));
256
257         if(jter->second <= count)
258         {
259           callbackData.mTasks.erase(jter);
260
261           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
262
263           if(callbackData.mTasks.empty())
264           {
265             eraseCallbackData = true;
266
267             // Move callback base into list.
268             // (To avoid task container changed during callback emit)
269             RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
270
271             DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
272
273             iter = mTasksCompletedCallbackList.erase(iter);
274           }
275         }
276         else
277         {
278           jter->second -= count;
279         }
280       }
281
282       if(!eraseCallbackData)
283       {
284         ++iter;
285       }
286     }
287   }
288
289   /**
290    * @brief Check whether current TasksCompletedId completed or not.
291    * @param[in] id The id of tasks completed.
292    * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
293    */
294   bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
295   {
296     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
297
298     // Lock while removing tasks completed callback list to the queue
299     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
300
301     auto iter = mTasksCompletedCallbackList.find(id);
302     if(iter != mTasksCompletedCallbackList.end())
303     {
304       auto& callbackData = iter->second;
305       if(callbackData.mTasks.empty())
306       {
307         // Move callback base into list.
308         // (To avoid task container changed during callback emit)
309         RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
310
311         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
312
313         iter = mTasksCompletedCallbackList.erase(iter);
314
315         return true;
316       }
317     }
318
319     return false;
320   }
321
322   /**
323    * @brief Remove taskS completed callbacks by id.
324    * @param[in] id The id of taskS completed.
325    * @return True if taskS completed id removed. False otherwise.
326    */
327   bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
328   {
329     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
330
331     // Lock while removing taskS completed callback list to the queue
332     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
333
334     auto iter = mTasksCompletedCallbackList.find(id);
335     if(iter == mTasksCompletedCallbackList.end())
336     {
337       // This task is already erased, or completed.
338       // Erase from completed excute callback list.
339
340       // Lock while removing excute callback list to the queue
341       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
342
343       for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
344       {
345         if(iter->second == id)
346         {
347           iter = mExcuteCallbackList.erase(iter);
348
349           return true;
350         }
351         else
352         {
353           ++iter;
354         }
355       }
356
357       // This task is alread erased and completed. Ignore.
358       return false;
359     }
360
361     mTasksCompletedCallbackList.erase(iter);
362
363     return true;
364   }
365
366   /**
367    * @brief Emit all completed callbacks.
368    * @note This API should be called at event thread.
369    */
370   void EmitCompletedTasks()
371   {
372     ExecuteCallbackContainer executeCallbackList;
373     {
374       // Lock while removing excute callback list to the queue
375       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
376
377       mEmitCompletedTaskTriggered = false;
378
379       // Copy callback lists, for let we execute callbacks out of mutex
380       executeCallbackList = std::move(mExcuteCallbackList);
381       mExcuteCallbackList.clear();
382     }
383
384     if(!executeCallbackList.empty())
385     {
386       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
387       // Execute all callbacks
388       for(auto&& callbackPair : executeCallbackList)
389       {
390         auto& callback = callbackPair.first;
391         auto  id       = callbackPair.second;
392
393         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
394
395         Dali::CallbackBase::Execute(*callback, id);
396       }
397
398       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
399     }
400   }
401
402   /**
403    * @brief Check whether there is some completed signal what we need to trace, or not.
404    * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
405    */
406   bool IsTasksCompletedCallbackExist()
407   {
408     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
409     return !mTasksCompletedCallbackList.empty();
410   }
411
412   /**
413    * @brief Check whether there is some completed signal what we need to execute, or not.
414    * @return True if mExcuteCallbackList is not empty. False otherwise.
415    */
416   bool IsExecuteCallbackExist()
417   {
418     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
419     return !mExcuteCallbackList.empty();
420   }
421
422 private:
423   void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
424   {
425     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
426
427     // Lock while adding excute callback list to the queue
428     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
429
430     mExcuteCallbackList.emplace_back(std::move(callback), id);
431
432     if(!mEmitCompletedTaskTriggered)
433     {
434       mEmitCompletedTaskTriggered = true;
435
436       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
437       mTrigger->Trigger();
438     }
439   }
440
441 private:
442   struct CallbackData
443   {
444   public:
445     CallbackData(CallbackBase* callback)
446     : mCallback(callback),
447       mTasks()
448     {
449     }
450
451     CallbackData(CallbackData&& rhs) noexcept
452     : mCallback(std::move(rhs.mCallback)),
453       mTasks(std::move(rhs.mTasks))
454     {
455     }
456
457     CallbackData& operator=(CallbackData&& rhs) noexcept
458     {
459       if(this != &rhs)
460       {
461         mCallback = std::move(rhs.mCallback);
462         mTasks    = std::move(rhs.mTasks);
463       }
464
465       return *this;
466     }
467
468   private:
469     // Delete copy operator.
470     CallbackData(const CallbackData& rhs) = delete;
471     CallbackData& operator=(const CallbackData& rhs) = delete;
472
473   public:
474     std::unique_ptr<CallbackBase>                  mCallback;
475     std::unordered_map<const AsyncTask*, uint32_t> mTasks;
476   };
477
478 private:
479   AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
480   EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
481
482   Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
483
484   using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
485   TasksCompletedContainer mTasksCompletedCallbackList;
486
487   using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
488   ExecuteCallbackContainer mExcuteCallbackList;
489
490   Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
491   Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
492
493   bool mEmitCompletedTaskTriggered : 1;
494 };
495
496 // AsyncTaskManager::CacheImpl
497
498 struct AsyncTaskManager::CacheImpl
499 {
500   CacheImpl(AsyncTaskManager& manager)
501   : mManager(manager)
502   {
503   }
504
505 public:
506   // Insert / Erase task cache API.
507
508   /**
509    * @brief Insert cache that input task.
510    * @pre Mutex be locked.
511    */
512   template<typename CacheContainer, typename Iterator>
513   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
514   {
515     auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
516     cacheContainer.insert(cacheContainer.end(), iterator);
517   }
518
519   /**
520    * @brief Erase cache that input task.
521    * @pre Mutex be locked.
522    */
523   template<typename CacheContainer, typename Iterator>
524   static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
525   {
526     auto mapIter = cacheMap.find(task.Get());
527     if(mapIter != cacheMap.end())
528     {
529       auto& cacheContainer = (*mapIter).second;
530       auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
531
532       if(cacheIter != cacheContainer.end())
533       {
534         cacheContainer.erase(cacheIter);
535         if(cacheContainer.empty())
536         {
537           cacheMap.erase(mapIter);
538         }
539       }
540     }
541   }
542
543   /**
544    * @brief Erase all cache that input task.
545    * @pre Mutex be locked.
546    */
547   template<typename CacheContainer>
548   static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
549   {
550     auto mapIter = cacheMap.find(task.Get());
551     if(mapIter != cacheMap.end())
552     {
553       cacheMap.erase(mapIter);
554     }
555   }
556
557 public:
558   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
559
560   // Keep cache iterators as list since we take tasks by FIFO as default.
561   using TaskCacheContainer          = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
562   using RunningTaskCacheContainer   = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
563   using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
564
565   TaskCacheContainer          mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
566   RunningTaskCacheContainer   mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
567   CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
568 };
569
570 // AsyncTaskManager
571
572 Dali::AsyncTaskManager AsyncTaskManager::Get()
573 {
574   Dali::AsyncTaskManager manager;
575   SingletonService       singletonService(SingletonService::Get());
576   if(singletonService)
577   {
578     // Check whether the async task manager is already created
579     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
580     if(handle)
581     {
582       // If so, downcast the handle of singleton
583       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
584     }
585
586     if(!manager)
587     {
588       // If not, create the async task manager and register it as a singleton
589       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
590       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
591       singletonService.Register(typeid(manager), manager);
592     }
593   }
594   return manager;
595 }
596
597 AsyncTaskManager::AsyncTaskManager()
598 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
599   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
600   mWaitingHighProirityTaskCounts(0u),
601   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
602   mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
603   mCacheImpl(new CacheImpl(*this)),
604   mProcessorRegistered(false)
605 {
606 }
607
608 AsyncTaskManager::~AsyncTaskManager()
609 {
610   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
611   {
612     mProcessorRegistered = false;
613     Dali::Adaptor::Get().UnregisterProcessor(*this);
614   }
615
616   // Join all threads.
617   mTasks.Clear();
618
619   // Remove task completed impl and cache impl after all threads are join.
620   mTasksCompletedImpl.reset();
621   mCacheImpl.reset();
622
623   // Remove tasks after CacheImpl removed
624   mWaitingTasks.clear();
625   mRunningTasks.clear();
626   mCompletedTasks.clear();
627 }
628
629 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
630 {
631   if(task)
632   {
633     // Lock while adding task to the queue
634     Mutex::ScopedLock lock(mWaitingTasksMutex);
635
636     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
637
638     // push back into waiting queue.
639     auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
640     CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
641
642     if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
643     {
644       // Increase the number of waiting tasks for high priority.
645       ++mWaitingHighProirityTaskCounts;
646     }
647
648     {
649       // For thread safety
650       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
651
652       // Finish all Running threads are working
653       if(mRunningTasks.size() >= mTasks.GetElementCount())
654       {
655         return;
656       }
657     }
658   }
659
660   size_t count = mTasks.GetElementCount();
661   size_t index = 0;
662   while(index++ < count)
663   {
664     auto processHelperIt = mTasks.GetNext();
665     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
666     if(processHelperIt->Request())
667     {
668       break;
669     }
670     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
671   }
672
673   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
674   RegisterProcessor();
675
676   return;
677 }
678
679 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
680 {
681   if(task)
682   {
683     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
684
685     // Check whether we need to unregister processor.
686     // If there is some non-empty queue exist, we don't need to unregister processor.
687     bool needCheckUnregisterProcessor = true;
688
689     uint32_t removedCount = 0u;
690
691     {
692       // Lock while remove task from the queue
693       Mutex::ScopedLock lock(mWaitingTasksMutex);
694
695       auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
696       if(mapIter != mCacheImpl->mWaitingTasksCache.end())
697       {
698         for(auto& iterator : mapIter->second)
699         {
700           DALI_ASSERT_DEBUG((*iterator) == task);
701           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
702           {
703             // Decrease the number of waiting tasks for high priority.
704             --mWaitingHighProirityTaskCounts;
705           }
706           mWaitingTasks.erase(iterator);
707           ++removedCount;
708         }
709         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
710       }
711
712       if(!mWaitingTasks.empty())
713       {
714         needCheckUnregisterProcessor = false;
715       }
716     }
717
718     {
719       // Lock while remove task from the queue
720       Mutex::ScopedLock lock(mRunningTasksMutex);
721
722       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
723       if(mapIter != mCacheImpl->mRunningTasksCache.end())
724       {
725         for(auto& iterator : mapIter->second)
726         {
727           DALI_ASSERT_DEBUG((*iterator).first == task);
728           // We cannot erase container. Just mark as canceled.
729           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
730           (*iterator).second = RunningTaskState::CANCELED;
731           ++removedCount;
732         }
733       }
734
735       if(!mRunningTasks.empty())
736       {
737         needCheckUnregisterProcessor = false;
738       }
739     }
740
741     {
742       // Lock while remove task from the queue
743       Mutex::ScopedLock lock(mCompletedTasksMutex);
744
745       auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
746       if(mapIter != mCacheImpl->mCompletedTasksCache.end())
747       {
748         for(auto& iterator : mapIter->second)
749         {
750           DALI_ASSERT_DEBUG(iterator->first == task);
751           mCompletedTasks.erase(iterator);
752           ++removedCount;
753         }
754         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
755       }
756
757       if(!mCompletedTasks.empty())
758       {
759         needCheckUnregisterProcessor = false;
760       }
761     }
762
763     // Remove TasksCompleted callback trace
764     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist() && removedCount > 0u)
765     {
766       mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
767     }
768
769     // UnregisterProcessor required to lock mutex. Call this API only if required.
770     if(needCheckUnregisterProcessor)
771     {
772       UnregisterProcessor();
773     }
774   }
775 }
776
777 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
778 {
779   // mTasksCompletedImpl will take ownership of callback.
780   Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
781
782   bool taskAdded = false; ///< Flag whether at least one task tracing now.
783
784   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
785
786   // Please be careful the order of mutex, to avoid dead lock.
787   {
788     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
789     {
790       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
791       {
792         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
793
794         // Collect all tasks from waiting tasks
795         for(auto& task : mWaitingTasks)
796         {
797           auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
798                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
799
800           if((checkMask & mask) == checkMask)
801           {
802             taskAdded = true;
803             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
804           }
805         }
806
807         // Collect all tasks from running tasks
808         for(auto& taskPair : mRunningTasks)
809         {
810           auto& task      = taskPair.first;
811           auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
812                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
813
814           if((checkMask & mask) == checkMask)
815           {
816             taskAdded = true;
817             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
818           }
819         }
820
821         // Collect all tasks from complete tasks
822         for(auto& taskPair : mCompletedTasks)
823         {
824           auto& task      = taskPair.first;
825           auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
826                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
827
828           if((checkMask & mask) == checkMask)
829           {
830             taskAdded = true;
831             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
832           }
833         }
834       }
835     }
836   }
837
838   // If there is nothing to check task, just excute callback right now.
839   if(!taskAdded)
840   {
841     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
842
843     mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
844   }
845   return tasksCompletedId;
846 }
847
848 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
849 {
850   return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
851 }
852
853 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
854 {
855   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
856
857   AsyncTaskPtr nextCompletedTask = nullptr;
858   {
859     // Lock while popping task out from the queue
860     Mutex::ScopedLock lock(mCompletedTasksMutex);
861
862     while(!mCompletedTasks.empty())
863     {
864       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
865
866       auto               next      = mCompletedTasks.begin();
867       AsyncTaskPtr       nextTask  = next->first;
868       CompletedTaskState taskState = next->second;
869       CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
870       mCompletedTasks.erase(next);
871
872       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
873
874       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
875       {
876         nextCompletedTask = nextTask;
877         break;
878       }
879
880       ignoredTaskList.push_back(nextTask);
881     }
882
883     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
884   }
885
886   return nextCompletedTask;
887 }
888
889 void AsyncTaskManager::RegisterProcessor()
890 {
891   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
892   {
893     Dali::Adaptor::Get().RegisterProcessor(*this);
894     mProcessorRegistered = true;
895   }
896 }
897
898 void AsyncTaskManager::UnregisterProcessor()
899 {
900   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
901   {
902     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
903     // Keep processor at least 1 task exist.
904     // Please be careful the order of mutex, to avoid dead lock.
905     // TODO : Should we lock all mutex rightnow?
906     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
907     if(mWaitingTasks.empty())
908     {
909       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
910       if(mRunningTasks.empty())
911       {
912         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
913         if(mCompletedTasks.empty())
914         {
915           mProcessorRegistered = false;
916           Dali::Adaptor::Get().UnregisterProcessor(*this);
917         }
918       }
919     }
920     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
921   }
922 }
923
924 void AsyncTaskManager::TasksCompleted()
925 {
926   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
927   while(AsyncTaskPtr task = PopNextCompletedTask())
928   {
929     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
930     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
931
932     // Remove TasksCompleted callback trace
933     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
934     {
935       mTasksCompletedImpl->RemoveTaskTrace(task);
936     }
937   }
938
939   UnregisterProcessor();
940   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
941
942   mTasksCompletedImpl->EmitCompletedTasks();
943 }
944
945 void AsyncTaskManager::Process(bool postProcessor)
946 {
947   TasksCompleted();
948 }
949
950 /// Worker thread called
951 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
952 {
953   // Lock while popping task out from the queue
954   Mutex::ScopedLock lock(mWaitingTasksMutex);
955
956   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
957
958   // pop out the next task from the queue
959   AsyncTaskPtr nextTask = nullptr;
960
961   // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
962   if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
963   {
964     // For thread safety
965     Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
966
967     if(mAvaliableLowPriorityTaskCounts == 0u)
968     {
969       // There are no avaliabe tasks to run now. Return nullptr.
970       return nextTask;
971     }
972   }
973
974   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
975   {
976     if((*iter)->IsReady())
977     {
978       const auto priorityType  = (*iter)->GetPriorityType();
979       bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
980       if(!taskAvaliable)
981       {
982         // For thread safety
983         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
984
985         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
986       }
987
988       if(taskAvaliable)
989       {
990         nextTask = *iter;
991
992         // Add Running queue
993         {
994           // Lock while popping task out from the queue
995           Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
996
997           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
998
999           auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1000           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1001
1002           CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1003           mWaitingTasks.erase(iter);
1004
1005           // Decrease avaliable task counts if it is low priority
1006           if(priorityType == AsyncTask::PriorityType::LOW)
1007           {
1008             // We are under running task mutex. We can decrease it.
1009             --mAvaliableLowPriorityTaskCounts;
1010           }
1011         }
1012
1013         if(priorityType == AsyncTask::PriorityType::HIGH)
1014         {
1015           // Decrease the number of waiting tasks for high priority.
1016           --mWaitingHighProirityTaskCounts;
1017         }
1018         break;
1019       }
1020     }
1021   }
1022
1023   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
1024
1025   return nextTask;
1026 }
1027
1028 /// Worker thread called
1029 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1030 {
1031   bool notify = false;
1032
1033   if(task)
1034   {
1035     bool needTrigger = false;
1036
1037     // Lock while check validation of task.
1038     {
1039       Mutex::ScopedLock lock(mRunningTasksMutex);
1040
1041       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1042       if(mapIter != mCacheImpl->mRunningTasksCache.end())
1043       {
1044         const auto cacheIter = mapIter->second.begin();
1045         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1046
1047         const auto iter = *cacheIter;
1048         DALI_ASSERT_DEBUG(iter->first == task);
1049         if(iter->second == RunningTaskState::RUNNING)
1050         {
1051           // This task is valid.
1052           notify = true;
1053         }
1054       }
1055
1056       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
1057     }
1058
1059     // We should execute this tasks complete callback out of mutex
1060     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1061     {
1062       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
1063       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1064
1065       // We need to remove task trace now.
1066       if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1067       {
1068         mTasksCompletedImpl->RemoveTaskTrace(task);
1069
1070         if(mTasksCompletedImpl->IsExecuteCallbackExist())
1071         {
1072           // We need to call EmitCompletedTasks(). Trigger main thread.
1073           needTrigger = true;
1074         }
1075       }
1076     }
1077
1078     // Lock while adding task to the queue
1079     {
1080       Mutex::ScopedLock lock(mRunningTasksMutex);
1081
1082       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1083       if(mapIter != mCacheImpl->mRunningTasksCache.end())
1084       {
1085         const auto cacheIter = mapIter->second.begin();
1086         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1087
1088         const auto iter         = *cacheIter;
1089         const auto priorityType = iter->first->GetPriorityType();
1090         // Increase avaliable task counts if it is low priority
1091         if(priorityType == AsyncTask::PriorityType::LOW)
1092         {
1093           // We are under running task mutex. We can increase it.
1094           ++mAvaliableLowPriorityTaskCounts;
1095         }
1096
1097         // Move task into completed, for ensure that AsyncTask destroy at main thread.
1098         {
1099           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1100
1101           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1102
1103           needTrigger |= callbackRequired;
1104
1105           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
1106
1107           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1108           CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1109
1110           CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1111           mRunningTasks.erase(iter);
1112
1113           if(!needTrigger)
1114           {
1115             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1116           }
1117
1118           // Now, task is invalidate.
1119           task.Reset();
1120         }
1121       }
1122     }
1123
1124     // Wake up the main thread
1125     if(needTrigger)
1126     {
1127       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1128       mTrigger->Trigger();
1129     }
1130   }
1131 }
1132
1133 // AsyncTaskManager::TaskHelper
1134
1135 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1136 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1137 {
1138 }
1139
1140 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1141 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1142 {
1143 }
1144
1145 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1146 : mProcessor(std::move(processor)),
1147   mAsyncTaskManager(asyncTaskManager)
1148 {
1149 }
1150
1151 bool AsyncTaskManager::TaskHelper::Request()
1152 {
1153   return mProcessor->Request();
1154 }
1155 } // namespace Adaptor
1156
1157 } // namespace Internal
1158
1159 } // namespace Dali