Merge "DALi Version 2.3.2" into devel/master
[platform/core/uifw/dali-adaptor.git] / dali / internal / system / common / async-task-manager-impl.cpp
1 /*
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17
18 // CLASS HEADER
19 #include <dali/internal/system/common/async-task-manager-impl.h>
20
21 // EXTERNAL INCLUDES
22 #include <dali/devel-api/adaptor-framework/environment-variable.h>
23 #include <dali/devel-api/adaptor-framework/thread-settings.h>
24 #include <dali/devel-api/common/singleton-service.h>
25 #include <dali/integration-api/adaptor-framework/adaptor.h>
26 #include <dali/integration-api/debug.h>
27
28 #include <unordered_map>
29
30 namespace Dali
31 {
32 namespace Internal
33 {
34 namespace Adaptor
35 {
36 namespace
37 {
38 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
39
40 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
41 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV     = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
42
43 // The number of threads for low priority task.
44 constexpr auto DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS = size_t{6u};
45 constexpr auto NUMBER_OF_LOW_PRIORITY_THREADS_ENV     = "DALI_ASYNC_MANAGER_LOW_PRIORITY_THREAD_POOL_SIZE";
46
47 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
48 {
49   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
50   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
51   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
52   DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
53   return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
54 }
55
56 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
57 {
58   auto numberString    = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
59   auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
60   DALI_ASSERT_DEBUG(numberOfThreads <= maxValue);
61   return (numberOfThreads > 0 && numberOfThreads <= maxValue) ? numberOfThreads : std::min(defaultValue, maxValue);
62 }
63
64 #if defined(DEBUG_ENABLED)
65 Debug::Filter* gAsyncTasksManagerLogFilter = Debug::Filter::New(Debug::NoLogging, false, "LOG_ASYNC_TASK_MANAGER");
66
67 uint32_t gThreadId = 0u; // Only for debug
68 #endif
69
70 } // unnamed namespace
71
72 // AsyncTaskThread
73
74 AsyncTaskThread::AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
75 : mConditionalWait(),
76   mAsyncTaskManager(asyncTaskManager),
77   mLogFactory(Dali::Adaptor::Get().GetLogFactory()),
78   mTraceFactory(Dali::Adaptor::Get().GetTraceFactory()),
79   mDestroyThread(false),
80   mIsThreadStarted(false),
81   mIsThreadIdle(true)
82 {
83 }
84
85 AsyncTaskThread::~AsyncTaskThread()
86 {
87   // Stop the thread
88   {
89     ConditionalWait::ScopedLock lock(mConditionalWait);
90     mDestroyThread = true;
91     mConditionalWait.Notify(lock);
92   }
93
94   Join();
95 }
96
97 bool AsyncTaskThread::Request()
98 {
99   if(!mIsThreadStarted)
100   {
101     Start();
102     mIsThreadStarted = true;
103   }
104
105   {
106     // Lock while adding task to the queue
107     ConditionalWait::ScopedLock lock(mConditionalWait);
108
109     if(mIsThreadIdle)
110     {
111       mIsThreadIdle = false;
112
113       // wake up the thread
114       mConditionalWait.Notify(lock);
115       return true;
116     }
117   }
118
119   return false;
120 }
121
122 void AsyncTaskThread::Run()
123 {
124 #if defined(DEBUG_ENABLED)
125   uint32_t threadId = gThreadId++;
126   {
127     char temp[100];
128     snprintf(temp, 100, "AsyncTaskThread[%u]", threadId);
129     SetThreadName(temp);
130   }
131 #else
132   SetThreadName("AsyncTaskThread");
133 #endif
134   mLogFactory.InstallLogFunction();
135   mTraceFactory.InstallTraceFunction();
136
137   while(!mDestroyThread)
138   {
139     AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
140     if(!task)
141     {
142       ConditionalWait::ScopedLock lock(mConditionalWait);
143       if(!mDestroyThread)
144       {
145         mIsThreadIdle = true;
146         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] wait\n", threadId);
147         mConditionalWait.Wait(lock);
148         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] awake\n", threadId);
149       }
150     }
151     else
152     {
153       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Process task [%p]\n", threadId, task.Get());
154       task->Process();
155       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Thread[%u] Complete task [%p]\n", threadId, task.Get());
156       if(!mDestroyThread)
157       {
158         mAsyncTaskManager.CompleteTask(std::move(task));
159       }
160     }
161   }
162 }
163
164 // AsyncTaskManager::TasksCompletedImpl
165
166 struct AsyncTaskManager::TasksCompletedImpl
167 {
168   TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
169   : mManager(manager),
170     mTrigger(trigger),
171     mEmitCompletedTaskTriggered(false)
172   {
173   }
174
175 public:
176   /**
177    * @brief Create new tasks completed id and.
178    * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
179    * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
180    */
181   Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
182   {
183     // Lock while adding tasks completed callback list to the queue
184     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
185
186     auto id = mTasksCompletedCount++;
187     DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
188
189     mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
190
191     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "GenerateTasksCompletedId id[%u] callback[%p]\n", id, callback);
192     return id;
193   }
194
195   /**
196    * @brief Append task that will be trace.
197    * @post RemoveTaskTrace should be called.
198    * @param[in] id The id of tasks completed.
199    * @param[in] task The task want to trace.
200    */
201   void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
202   {
203     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AppendTaskTrace id[%u] task[%p]\n", id, task.Get());
204
205     // Lock while adding tasks completed callback list to the queue
206     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
207
208     auto iter = mTasksCompletedCallbackList.find(id);
209     if(iter == mTasksCompletedCallbackList.end())
210     {
211       // This task is already erased. Ignore.
212       return;
213     }
214
215     auto& callbackData = iter->second;
216
217     auto jter = callbackData.mTasks.find(task.Get());
218
219     if(jter != callbackData.mTasks.end())
220     {
221       // Increase reference count.
222       ++(jter->second);
223     }
224     else
225     {
226       callbackData.mTasks.insert({task.Get(), 1u});
227     }
228   }
229
230   /**
231    * @brief Remove all task that were traced.
232    * @param[in] task The task want to remove trace.
233    * @param[in] taskCount The number of tasks that will be removed.
234    */
235   void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
236   {
237     if(count == 0u)
238     {
239       return;
240     }
241     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace task[%p] remove count[%u]\n", task.Get(), count);
242
243     // Lock while removing tasks completed callback list to the queue
244     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
245
246     for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
247     {
248       auto& callbackData      = iter->second;
249       bool  eraseCallbackData = false;
250
251       auto jter = callbackData.mTasks.find(task.Get());
252
253       if(jter != callbackData.mTasks.end())
254       {
255         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task[%p], current refcount[%u]\n", iter->first, task.Get(), (jter->second));
256
257         if(jter->second <= count)
258         {
259           callbackData.mTasks.erase(jter);
260
261           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTaskTrace id[%u] task erased. remained tasks[%zu]", iter->first, callbackData.mTasks.size());
262
263           if(callbackData.mTasks.empty())
264           {
265             eraseCallbackData = true;
266
267             // Move callback base into list.
268             // (To avoid task container changed during callback emit)
269             RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
270
271             DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
272
273             iter = mTasksCompletedCallbackList.erase(iter);
274           }
275         }
276         else
277         {
278           jter->second -= count;
279         }
280       }
281
282       if(!eraseCallbackData)
283       {
284         ++iter;
285       }
286     }
287   }
288
289   /**
290    * @brief Check whether current TasksCompletedId completed or not.
291    * @param[in] id The id of tasks completed.
292    * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
293    */
294   bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
295   {
296     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CheckTasksCompletedCallbackCompleted[%u]\n", id);
297
298     // Lock while removing tasks completed callback list to the queue
299     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
300
301     auto iter = mTasksCompletedCallbackList.find(id);
302     if(iter != mTasksCompletedCallbackList.end())
303     {
304       auto& callbackData = iter->second;
305       if(callbackData.mTasks.empty())
306       {
307         // Move callback base into list.
308         // (To avoid task container changed during callback emit)
309         RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
310
311         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "id[%u] completed!\n", iter->first);
312
313         iter = mTasksCompletedCallbackList.erase(iter);
314
315         return true;
316       }
317     }
318
319     return false;
320   }
321
322   /**
323    * @brief Remove taskS completed callbacks by id.
324    * @param[in] id The id of taskS completed.
325    * @return True if taskS completed id removed. False otherwise.
326    */
327   bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
328   {
329     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTasksCompleted[%u]\n", id);
330
331     // Lock while removing taskS completed callback list to the queue
332     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
333
334     auto iter = mTasksCompletedCallbackList.find(id);
335     if(iter == mTasksCompletedCallbackList.end())
336     {
337       // This task is already erased, or completed.
338       // Erase from completed excute callback list.
339
340       // Lock while removing excute callback list to the queue
341       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
342
343       for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
344       {
345         if(iter->second == id)
346         {
347           iter = mExcuteCallbackList.erase(iter);
348
349           return true;
350         }
351         else
352         {
353           ++iter;
354         }
355       }
356
357       // This task is alread erased and completed. Ignore.
358       return false;
359     }
360
361     mTasksCompletedCallbackList.erase(iter);
362
363     return true;
364   }
365
366   /**
367    * @brief Emit all completed callbacks.
368    * @note This API should be called at event thread.
369    */
370   void EmitCompletedTasks()
371   {
372     ExecuteCallbackContainer executeCallbackList;
373     {
374       // Lock while removing excute callback list to the queue
375       Mutex::ScopedLock lock(mExcuteCallbacksMutex);
376
377       mEmitCompletedTaskTriggered = false;
378
379       // Copy callback lists, for let we execute callbacks out of mutex
380       executeCallbackList = std::move(mExcuteCallbackList);
381       mExcuteCallbackList.clear();
382     }
383
384     if(!executeCallbackList.empty())
385     {
386       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback count[%zu]\n", executeCallbackList.size());
387       // Execute all callbacks
388       for(auto&& callbackPair : executeCallbackList)
389       {
390         auto& callback = callbackPair.first;
391         auto  id       = callbackPair.second;
392
393         DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute taskS completed callback[%p] for id[%u]\n", callback.get(), id);
394
395         Dali::CallbackBase::Execute(*callback, id);
396       }
397
398       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Excute callback end\n");
399     }
400   }
401
402   /**
403    * @brief Check whether there is some completed signal what we need to trace, or not.
404    * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
405    */
406   bool IsTasksCompletedCallbackExist()
407   {
408     Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
409     return !mTasksCompletedCallbackList.empty();
410   }
411
412   /**
413    * @brief Check whether there is some completed signal what we need to execute, or not.
414    * @return True if mExcuteCallbackList is not empty. False otherwise.
415    */
416   bool IsExecuteCallbackExist()
417   {
418     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
419     return !mExcuteCallbackList.empty();
420   }
421
422 private:
423   void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
424   {
425     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted[%u] need to be execute with callback[%p]\n", id, callback.get());
426
427     // Lock while adding excute callback list to the queue
428     Mutex::ScopedLock lock(mExcuteCallbacksMutex);
429
430     mExcuteCallbackList.emplace_back(std::move(callback), id);
431
432     if(!mEmitCompletedTaskTriggered)
433     {
434       mEmitCompletedTaskTriggered = true;
435
436       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger processor\n");
437       mTrigger->Trigger();
438     }
439   }
440
441 private:
442   struct CallbackData
443   {
444   public:
445     CallbackData(CallbackBase* callback)
446     : mCallback(callback),
447       mTasks()
448     {
449     }
450
451     CallbackData(CallbackData&& rhs) noexcept
452     : mCallback(std::move(rhs.mCallback)),
453       mTasks(std::move(rhs.mTasks))
454     {
455     }
456
457     CallbackData& operator=(CallbackData&& rhs) noexcept
458     {
459       if(this != &rhs)
460       {
461         mCallback = std::move(rhs.mCallback);
462         mTasks    = std::move(rhs.mTasks);
463       }
464
465       return *this;
466     }
467
468   private:
469     // Delete copy operator.
470     CallbackData(const CallbackData& rhs) = delete;
471     CallbackData& operator=(const CallbackData& rhs) = delete;
472
473   public:
474     std::unique_ptr<CallbackBase>                  mCallback;
475     std::unordered_map<const AsyncTask*, uint32_t> mTasks;
476   };
477
478 private:
479   AsyncTaskManager&    mManager; ///< Owner of this CacheImpl.
480   EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
481
482   Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
483
484   using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
485   TasksCompletedContainer mTasksCompletedCallbackList;
486
487   using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
488   ExecuteCallbackContainer mExcuteCallbackList;
489
490   Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
491   Dali::Mutex mExcuteCallbacksMutex;         ///< Mutex for mExcuteCallbackList.
492
493   bool mEmitCompletedTaskTriggered : 1;
494 };
495
496 // AsyncTaskManager::CacheImpl
497
498 struct AsyncTaskManager::CacheImpl
499 {
500   CacheImpl(AsyncTaskManager& manager)
501   : mManager(manager)
502   {
503   }
504
505 public:
506   // Insert / Erase task cache API.
507
508   /**
509    * @brief Insert cache that input task.
510    * @pre Mutex be locked.
511    */
512   template<typename CacheContainer, typename Iterator>
513   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
514   {
515     auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
516     cacheContainer.insert(cacheContainer.end(), iterator);
517   }
518
519   /**
520    * @brief Erase cache that input task.
521    * @pre Mutex be locked.
522    */
523   template<typename CacheContainer, typename Iterator>
524   static void EraseTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
525   {
526     auto mapIter = cacheMap.find(task.Get());
527     if(mapIter != cacheMap.end())
528     {
529       auto& cacheContainer = (*mapIter).second;
530       auto  cacheIter      = std::find(cacheContainer.begin(), cacheContainer.end(), iterator);
531
532       if(cacheIter != cacheContainer.end())
533       {
534         cacheContainer.erase(cacheIter);
535         if(cacheContainer.empty())
536         {
537           cacheMap.erase(mapIter);
538         }
539       }
540     }
541   }
542
543   /**
544    * @brief Erase all cache that input task.
545    * @pre Mutex be locked.
546    */
547   template<typename CacheContainer>
548   static void EraseAllTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task)
549   {
550     auto mapIter = cacheMap.find(task.Get());
551     if(mapIter != cacheMap.end())
552     {
553       cacheMap.erase(mapIter);
554     }
555   }
556
557 public:
558   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
559
560   // Keep cache iterators as list since we take tasks by FIFO as default.
561   using TaskCacheContainer          = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
562   using RunningTaskCacheContainer   = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
563   using CompletedTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncCompletedTaskContainer::iterator>>;
564
565   TaskCacheContainer          mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
566   RunningTaskCacheContainer   mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
567   CompletedTaskCacheContainer mCompletedTasksCache; ///< The cache of tasks and iterator for completed async process. Must be locked under mCompletedTasksMutex.
568 };
569
570 // AsyncTaskManager
571
572 Dali::AsyncTaskManager AsyncTaskManager::Get()
573 {
574   Dali::AsyncTaskManager manager;
575   SingletonService       singletonService(SingletonService::Get());
576   if(singletonService)
577   {
578     // Check whether the async task manager is already created
579     Dali::BaseHandle handle = singletonService.GetSingleton(typeid(Dali::AsyncTaskManager));
580     if(handle)
581     {
582       // If so, downcast the handle of singleton
583       manager = Dali::AsyncTaskManager(dynamic_cast<Internal::Adaptor::AsyncTaskManager*>(handle.GetObjectPtr()));
584     }
585
586     if(!manager)
587     {
588       // If not, create the async task manager and register it as a singleton
589       Internal::Adaptor::AsyncTaskManager* internalAsyncTaskManager = new Internal::Adaptor::AsyncTaskManager();
590       manager                                                       = Dali::AsyncTaskManager(internalAsyncTaskManager);
591       singletonService.Register(typeid(manager), manager);
592     }
593   }
594   return manager;
595 }
596
597 AsyncTaskManager::AsyncTaskManager()
598 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
599   mAvaliableLowPriorityTaskCounts(GetNumberOfLowPriorityThreads(NUMBER_OF_LOW_PRIORITY_THREADS_ENV, DEFAULT_NUMBER_OF_LOW_PRIORITY_THREADS, mTasks.GetElementCount())),
600   mWaitingHighProirityTaskCounts(0u),
601   mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TasksCompleted))),
602   mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get())),
603   mCacheImpl(new CacheImpl(*this)),
604   mProcessorRegistered(false)
605 {
606 }
607
608 AsyncTaskManager::~AsyncTaskManager()
609 {
610   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
611   {
612     mProcessorRegistered = false;
613     Dali::Adaptor::Get().UnregisterProcessor(*this);
614   }
615
616   // Join all threads.
617   mTasks.Clear();
618
619   // Remove task completed impl and cache impl after all threads are join.
620   mTasksCompletedImpl.reset();
621   mCacheImpl.reset();
622
623   // Remove tasks after CacheImpl removed
624   mWaitingTasks.clear();
625   mRunningTasks.clear();
626   mCompletedTasks.clear();
627 }
628
629 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
630 {
631   if(task)
632   {
633     // Lock while adding task to the queue
634     Mutex::ScopedLock lock(mWaitingTasksMutex);
635
636     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "AddTask [%p]\n", task.Get());
637
638     // push back into waiting queue.
639     auto waitingIter = mWaitingTasks.insert(mWaitingTasks.end(), task);
640     CacheImpl::InsertTaskCache(mCacheImpl->mWaitingTasksCache, task, waitingIter);
641
642     if(task->GetPriorityType() == AsyncTask::PriorityType::HIGH)
643     {
644       // Increase the number of waiting tasks for high priority.
645       ++mWaitingHighProirityTaskCounts;
646     }
647
648     {
649       // For thread safety
650       Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
651
652       // Finish all Running threads are working
653       if(mRunningTasks.size() >= mTasks.GetElementCount())
654       {
655         return;
656       }
657     }
658   }
659
660   size_t count = mTasks.GetElementCount();
661   size_t index = 0;
662   while(index++ < count)
663   {
664     auto processHelperIt = mTasks.GetNext();
665     DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
666     if(processHelperIt->Request())
667     {
668       break;
669     }
670     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
671   }
672
673   // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
674   RegisterProcessor();
675
676   return;
677 }
678
679 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
680 {
681   if(task)
682   {
683     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
684
685     // Check whether we need to unregister processor.
686     // If there is some non-empty queue exist, we don't need to unregister processor.
687     bool needCheckUnregisterProcessor = true;
688
689     uint32_t removedCount = 0u;
690
691     {
692       // Lock while remove task from the queue
693       Mutex::ScopedLock lock(mWaitingTasksMutex);
694
695       auto mapIter = mCacheImpl->mWaitingTasksCache.find(task.Get());
696       if(mapIter != mCacheImpl->mWaitingTasksCache.end())
697       {
698         for(auto& iterator : mapIter->second)
699         {
700           DALI_ASSERT_DEBUG((*iterator) == task);
701           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
702           {
703             // Decrease the number of waiting tasks for high priority.
704             --mWaitingHighProirityTaskCounts;
705           }
706           mWaitingTasks.erase(iterator);
707           ++removedCount;
708         }
709         CacheImpl::EraseAllTaskCache(mCacheImpl->mWaitingTasksCache, task);
710       }
711
712       if(!mWaitingTasks.empty())
713       {
714         needCheckUnregisterProcessor = false;
715       }
716     }
717
718     {
719       // Lock while remove task from the queue
720       Mutex::ScopedLock lock(mRunningTasksMutex);
721
722       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
723       if(mapIter != mCacheImpl->mRunningTasksCache.end())
724       {
725         for(auto& iterator : mapIter->second)
726         {
727           DALI_ASSERT_DEBUG((*iterator).first == task);
728           // We cannot erase container. Just mark as canceled.
729           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
730           if((*iterator).second == RunningTaskState::RUNNING)
731           {
732             (*iterator).second = RunningTaskState::CANCELED;
733             ++removedCount;
734           }
735         }
736       }
737
738       if(!mRunningTasks.empty())
739       {
740         needCheckUnregisterProcessor = false;
741       }
742     }
743
744     {
745       // Lock while remove task from the queue
746       Mutex::ScopedLock lock(mCompletedTasksMutex);
747
748       auto mapIter = mCacheImpl->mCompletedTasksCache.find(task.Get());
749       if(mapIter != mCacheImpl->mCompletedTasksCache.end())
750       {
751         for(auto& iterator : mapIter->second)
752         {
753           DALI_ASSERT_DEBUG((*iterator).first == task);
754           if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
755           {
756             ++removedCount;
757           }
758           mCompletedTasks.erase(iterator);
759         }
760         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
761       }
762
763       if(!mCompletedTasks.empty())
764       {
765         needCheckUnregisterProcessor = false;
766       }
767     }
768
769     // Remove TasksCompleted callback trace
770     if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
771     {
772       mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
773     }
774
775     // UnregisterProcessor required to lock mutex. Call this API only if required.
776     if(needCheckUnregisterProcessor)
777     {
778       UnregisterProcessor();
779     }
780   }
781 }
782
783 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
784 {
785   // mTasksCompletedImpl will take ownership of callback.
786   Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
787
788   bool taskAdded = false; ///< Flag whether at least one task tracing now.
789
790   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "SetCompletedCallback id : %u, mask : %d\n", tasksCompletedId, static_cast<int32_t>(mask));
791
792   // Please be careful the order of mutex, to avoid dead lock.
793   {
794     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
795     {
796       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
797       {
798         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
799
800         // Collect all tasks from waiting tasks
801         for(auto& task : mWaitingTasks)
802         {
803           auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
804                            (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
805
806           if((checkMask & mask) == checkMask)
807           {
808             taskAdded = true;
809             mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
810           }
811         }
812
813         // Collect all tasks from running tasks
814         for(auto& taskPair : mRunningTasks)
815         {
816           // Trace only if it is running now.
817           if(taskPair.second == RunningTaskState::RUNNING)
818           {
819             auto& task      = taskPair.first;
820             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
821                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
822
823             if((checkMask & mask) == checkMask)
824             {
825               taskAdded = true;
826               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
827             }
828           }
829         }
830
831         // Collect all tasks from complete tasks
832         for(auto& taskPair : mCompletedTasks)
833         {
834           // Trace only if it is need callback.
835           // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
836           //        If worker thread invocation, than it already remove trace at completed timing.
837           //        If canceled cases, we don't append trace at running tasks already.
838           //        So, we don't need to trace for SKIP_CALLBACK cases.
839           if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
840           {
841             auto& task      = taskPair.first;
842             auto  checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
843                              (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
844
845             if((checkMask & mask) == checkMask)
846             {
847               taskAdded = true;
848               mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
849             }
850           }
851         }
852       }
853     }
854   }
855
856   // If there is nothing to check task, just excute callback right now.
857   if(!taskAdded)
858   {
859     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompletedCallback id[%u] executed now due to no task exist\n", tasksCompletedId);
860
861     mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
862   }
863   return tasksCompletedId;
864 }
865
866 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
867 {
868   return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
869 }
870
871 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
872 {
873   std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
874
875   AsyncTaskPtr nextCompletedTask = nullptr;
876   {
877     // Lock while popping task out from the queue
878     Mutex::ScopedLock lock(mCompletedTasksMutex);
879
880     while(!mCompletedTasks.empty())
881     {
882       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextCompletedTask, completed task count : [%zu]\n", mCompletedTasks.size());
883
884       auto               next      = mCompletedTasks.begin();
885       AsyncTaskPtr       nextTask  = next->first;
886       CompletedTaskState taskState = next->second;
887       CacheImpl::EraseTaskCache(mCacheImpl->mCompletedTasksCache, nextTask, next);
888       mCompletedTasks.erase(next);
889
890       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Completed task [%p] (callback required? : %d)\n", nextTask.Get(), taskState == CompletedTaskState::REQUIRE_CALLBACK);
891
892       if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
893       {
894         nextCompletedTask = nextTask;
895         break;
896       }
897
898       ignoredTaskList.push_back(nextTask);
899     }
900
901     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup completed [%p]\n", nextCompletedTask.Get());
902   }
903
904   return nextCompletedTask;
905 }
906
907 void AsyncTaskManager::RegisterProcessor()
908 {
909   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
910   {
911     Dali::Adaptor::Get().RegisterProcessor(*this);
912     mProcessorRegistered = true;
913   }
914 }
915
916 void AsyncTaskManager::UnregisterProcessor()
917 {
918   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
919   {
920     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor begin\n");
921     // Keep processor at least 1 task exist.
922     // Please be careful the order of mutex, to avoid dead lock.
923     // TODO : Should we lock all mutex rightnow?
924     Mutex::ScopedLock lockWait(mWaitingTasksMutex);
925     if(mWaitingTasks.empty())
926     {
927       Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
928       if(mRunningTasks.empty())
929       {
930         Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
931         if(mCompletedTasks.empty())
932         {
933           mProcessorRegistered = false;
934           Dali::Adaptor::Get().UnregisterProcessor(*this);
935         }
936       }
937     }
938     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "UnregisterProcessor end (registed? %d)\n", mProcessorRegistered);
939   }
940 }
941
942 void AsyncTaskManager::TasksCompleted()
943 {
944   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted begin\n");
945   while(AsyncTaskPtr task = PopNextCompletedTask())
946   {
947     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
948     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
949
950     // Remove TasksCompleted callback trace
951     if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
952     {
953       mTasksCompletedImpl->RemoveTaskTrace(task);
954     }
955   }
956
957   UnregisterProcessor();
958   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
959
960   mTasksCompletedImpl->EmitCompletedTasks();
961 }
962
963 void AsyncTaskManager::Process(bool postProcessor)
964 {
965   TasksCompleted();
966 }
967
968 /// Worker thread called
969 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
970 {
971   // Lock while popping task out from the queue
972   Mutex::ScopedLock lock(mWaitingTasksMutex);
973
974   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "PopNextTaskToProcess, waiting task count : [%zu]\n", mWaitingTasks.size());
975
976   // pop out the next task from the queue
977   AsyncTaskPtr nextTask = nullptr;
978
979   // Fast cut if all waiting tasks are LOW priority, and we cannot excute low task anymore.
980   if(mWaitingHighProirityTaskCounts == 0u && !mWaitingTasks.empty())
981   {
982     // For thread safety
983     Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
984
985     if(mAvaliableLowPriorityTaskCounts == 0u)
986     {
987       // There are no avaliabe tasks to run now. Return nullptr.
988       return nextTask;
989     }
990   }
991
992   for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
993   {
994     if((*iter)->IsReady())
995     {
996       const auto priorityType  = (*iter)->GetPriorityType();
997       bool       taskAvaliable = priorityType == AsyncTask::PriorityType::HIGH; // Task always valid if it's priority is high
998       if(!taskAvaliable)
999       {
1000         // For thread safety
1001         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1002
1003         taskAvaliable = (mAvaliableLowPriorityTaskCounts > 0u); // priority is low, but we can use it.
1004       }
1005
1006       // Check whether we try to running same task at multiple threads.
1007       if(taskAvaliable)
1008       {
1009         Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1010         auto              mapIter = mCacheImpl->mRunningTasksCache.find((*iter).Get());
1011         if(mapIter != mCacheImpl->mRunningTasksCache.end())
1012         {
1013           if(!mapIter->second.empty())
1014           {
1015             // Some other thread running this tasks now. Ignore it.
1016             DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Some other thread running this task [%p]\n", (*iter).Get());
1017             taskAvaliable = false;
1018           }
1019         }
1020       }
1021
1022       if(taskAvaliable)
1023       {
1024         nextTask = *iter;
1025
1026         // Add Running queue
1027         {
1028           // Lock while popping task out from the queue
1029           Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
1030
1031           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
1032
1033           auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
1034           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
1035
1036           CacheImpl::EraseTaskCache(mCacheImpl->mWaitingTasksCache, nextTask, iter);
1037           mWaitingTasks.erase(iter);
1038
1039           // Decrease avaliable task counts if it is low priority
1040           if(priorityType == AsyncTask::PriorityType::LOW)
1041           {
1042             // We are under running task mutex. We can decrease it.
1043             --mAvaliableLowPriorityTaskCounts;
1044           }
1045         }
1046
1047         if(priorityType == AsyncTask::PriorityType::HIGH)
1048         {
1049           // Decrease the number of waiting tasks for high priority.
1050           --mWaitingHighProirityTaskCounts;
1051         }
1052         break;
1053       }
1054     }
1055   }
1056
1057   DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::General, "Pickup process [%p]\n", nextTask.Get());
1058
1059   return nextTask;
1060 }
1061
1062 /// Worker thread called
1063 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
1064 {
1065   bool notify = false;
1066
1067   if(task)
1068   {
1069     bool needTrigger = false;
1070
1071     // Lock while check validation of task.
1072     {
1073       Mutex::ScopedLock lock(mRunningTasksMutex);
1074
1075       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1076       if(mapIter != mCacheImpl->mRunningTasksCache.end())
1077       {
1078         const auto cacheIter = mapIter->second.begin();
1079         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1080
1081         const auto iter = *cacheIter;
1082         DALI_ASSERT_DEBUG(iter->first == task);
1083         if(iter->second == RunningTaskState::RUNNING)
1084         {
1085           // This task is valid.
1086           notify = true;
1087         }
1088       }
1089
1090       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
1091     }
1092
1093     // We should execute this tasks complete callback out of mutex
1094     if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
1095     {
1096       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
1097       CallbackBase::Execute(*(task->GetCompletedCallback()), task);
1098
1099       // We need to remove task trace now.
1100       if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
1101       {
1102         mTasksCompletedImpl->RemoveTaskTrace(task);
1103
1104         if(mTasksCompletedImpl->IsExecuteCallbackExist())
1105         {
1106           // We need to call EmitCompletedTasks(). Trigger main thread.
1107           needTrigger = true;
1108         }
1109       }
1110     }
1111
1112     // Lock while adding task to the queue
1113     {
1114       Mutex::ScopedLock lock(mRunningTasksMutex);
1115
1116       auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
1117       if(mapIter != mCacheImpl->mRunningTasksCache.end())
1118       {
1119         const auto cacheIter = mapIter->second.begin();
1120         DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
1121
1122         const auto iter         = *cacheIter;
1123         const auto priorityType = iter->first->GetPriorityType();
1124         // Increase avaliable task counts if it is low priority
1125         if(priorityType == AsyncTask::PriorityType::LOW)
1126         {
1127           // We are under running task mutex. We can increase it.
1128           ++mAvaliableLowPriorityTaskCounts;
1129         }
1130
1131         // Move task into completed, for ensure that AsyncTask destroy at main thread.
1132         {
1133           Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
1134
1135           const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1136
1137           needTrigger |= callbackRequired;
1138
1139           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p] (callback required? : %d)\n", task.Get(), callbackRequired);
1140
1141           auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1142           CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
1143
1144           CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
1145           mRunningTasks.erase(iter);
1146
1147           if(!needTrigger)
1148           {
1149             needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1150           }
1151
1152           // Now, task is invalidate.
1153           task.Reset();
1154         }
1155       }
1156     }
1157
1158     // Wake up the main thread
1159     if(needTrigger)
1160     {
1161       DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
1162       mTrigger->Trigger();
1163     }
1164   }
1165 }
1166
1167 // AsyncTaskManager::TaskHelper
1168
1169 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1170 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1171 {
1172 }
1173
1174 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1175 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1176 {
1177 }
1178
1179 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1180 : mProcessor(std::move(processor)),
1181   mAsyncTaskManager(asyncTaskManager)
1182 {
1183 }
1184
1185 bool AsyncTaskManager::TaskHelper::Request()
1186 {
1187   return mProcessor->Request();
1188 }
1189 } // namespace Adaptor
1190
1191 } // namespace Internal
1192
1193 } // namespace Dali