2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <toolkit-async-task-manager.h>
22 #include <dali/devel-api/adaptor-framework/thread-settings.h>
23 #include <dali/devel-api/threading/conditional-wait.h>
24 #include <dali/devel-api/threading/mutex.h>
25 #include <dali/devel-api/threading/thread.h>
26 #include <dali/public-api/adaptor-framework/async-task-manager.h>
27 #include <dali/public-api/adaptor-framework/round-robin-container-view.h>
28 #include <dali/public-api/common/list-wrapper.h>
29 #include <dali/public-api/object/base-object.h>
31 #include <unordered_map>
34 #include <toolkit-application.h>
35 #include <toolkit-environment-variable.h>
36 #include <toolkit-event-thread-callback.h>
46 std::atomic_uint32_t gThreadId = 0u;
49 class AsyncTaskThread;
51 class AsyncTaskManager : public Dali::BaseObject
54 static Dali::AsyncTaskManager Get();
60 void AddTask(AsyncTaskPtr task);
61 void RemoveTask(AsyncTaskPtr task);
63 Dali::AsyncTaskManager::TasksCompletedId SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask);
64 bool RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId);
67 AsyncTaskPtr PopNextCompletedTask();
69 void TaskAllCompleted();
71 public: // Worker thread called method
72 AsyncTaskPtr PopNextTaskToProcess();
73 void CompleteTask(AsyncTaskPtr&& task);
76 * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
82 * @brief Create an TaskHelper.
84 * @param[in] asyncTaskManager Reference to the AsyncTaskManager
86 TaskHelper(AsyncTaskManager& asyncTaskManager);
89 * @brief Request the thread to process the task.
90 * @return True if the request succeeds, otherwise false.
95 TaskHelper(const TaskHelper&) = delete;
96 TaskHelper& operator=(const TaskHelper&) = delete;
98 TaskHelper(TaskHelper&& rhs);
99 TaskHelper& operator=(TaskHelper&& rhs) = delete;
103 * @brief Main constructor that used by all other constructors
105 TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager);
108 std::unique_ptr<AsyncTaskThread> mProcessor;
109 AsyncTaskManager& mAsyncTaskManager;
113 * @brief State of running task
115 enum class RunningTaskState
117 RUNNING = 0, ///< Running task
118 CANCELED = 1, ///< Canceled by user
122 * @brief State of complete task
124 enum class CompletedTaskState
126 REQUIRE_CALLBACK = 0, ///< Need to execute callback when completed task process.
127 SKIP_CALLBACK = 1, ///< Do not execute callback
132 AsyncTaskManager(const AsyncTaskManager& manager);
135 AsyncTaskManager& operator=(const AsyncTaskManager& manager);
138 // Keep Task as list since we take tasks by FIFO as default.
139 using AsyncTaskContainer = std::list<AsyncTaskPtr>;
141 using AsyncRunningTaskPair = std::pair<AsyncTaskPtr, RunningTaskState>;
142 using AsyncRunningTaskContainer = std::list<AsyncRunningTaskPair>;
144 using AsyncCompletedTaskPair = std::pair<AsyncTaskPtr, CompletedTaskState>;
145 using AsyncCompletedTaskContainer = std::list<AsyncCompletedTaskPair>;
147 AsyncTaskContainer mWaitingTasks; ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
148 AsyncRunningTaskContainer mRunningTasks; ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
149 AsyncCompletedTaskContainer mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
151 RoundRobinContainerView<TaskHelper> mTasks;
153 Dali::Mutex mWaitingTasksMutex; ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
154 Dali::Mutex mRunningTasksMutex; ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
155 Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
157 std::unique_ptr<EventThreadCallback> mTrigger;
159 struct TasksCompletedImpl;
160 std::unique_ptr<TasksCompletedImpl> mTasksCompletedImpl; ///< TaskS completed signal interface for AsyncTaskManager.
163 inline Internal::Adaptor::AsyncTaskManager& GetImplementation(Dali::AsyncTaskManager& obj)
165 DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
167 Dali::BaseObject& handle = obj.GetBaseObject();
169 return static_cast<Internal::Adaptor::AsyncTaskManager&>(handle);
172 inline const Internal::Adaptor::AsyncTaskManager& GetImplementation(const Dali::AsyncTaskManager& obj)
174 DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
176 const Dali::BaseObject& handle = obj.GetBaseObject();
178 return static_cast<const Internal::Adaptor::AsyncTaskManager&>(handle);
181 /********************************************************************************/
182 /********************************* INTERNAL CLASS *****************************/
183 /********************************************************************************/
187 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
189 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
190 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
192 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
194 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
195 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
196 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
197 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
198 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
201 thread_local Dali::AsyncTaskManager gAsyncTaskManager;
205 * The worker thread for async process
207 class AsyncTaskThread : public Thread
213 AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
214 : mConditionalWait(),
215 mAsyncTaskManager(asyncTaskManager),
216 mThreadId(++gThreadId),
217 mDestroyThread(false),
218 mIsThreadStarted(false),
226 ~AsyncTaskThread() override
230 ConditionalWait::ScopedLock lock(mConditionalWait);
231 mDestroyThread = true;
232 mConditionalWait.Notify(lock);
239 * @brief Request the thread to process the task.
240 * @return True if the request is successed, otherwise false.
244 if(!mIsThreadStarted)
247 mIsThreadStarted = true;
251 // Lock while adding task to the queue
252 ConditionalWait::ScopedLock lock(mConditionalWait);
256 mIsThreadIdle = false;
258 // wake up the thread
259 mConditionalWait.Notify(lock);
269 * The entry function of the worker thread.
275 snprintf(temp, 100, "AsyncTaskThread[%u]", mThreadId);
279 while(!mDestroyThread)
281 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
284 ConditionalWait::ScopedLock lock(mConditionalWait);
287 mIsThreadIdle = true;
288 mConditionalWait.Wait(lock);
296 mAsyncTaskManager.CompleteTask(std::move(task));
304 AsyncTaskThread(const AsyncTaskThread& thread) = delete;
307 AsyncTaskThread& operator=(const AsyncTaskThread& thread) = delete;
310 ConditionalWait mConditionalWait;
311 AsyncTaskManager& mAsyncTaskManager;
314 bool mIsThreadStarted;
318 // AsyncTaskManager::TasksCompletedImpl
320 struct AsyncTaskManager::TasksCompletedImpl
322 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
325 mEmitCompletedTaskTriggered(false)
331 * @brief Create new tasks completed id and.
332 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
333 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
335 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
337 // Lock while adding tasks completed callback list to the queue
338 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
340 auto id = mTasksCompletedCount++;
341 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
343 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
348 * @brief Append task that will be trace.
349 * @post RemoveTaskTrace should be called.
350 * @param[in] id The id of tasks completed.
351 * @param[in] task The task want to trace.
353 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
355 // Lock while adding tasks completed callback list to the queue
356 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
358 auto iter = mTasksCompletedCallbackList.find(id);
359 if(iter == mTasksCompletedCallbackList.end())
361 // This task is already erased. Ignore.
365 auto& callbackData = iter->second;
367 auto jter = callbackData.mTasks.find(task.Get());
369 if(jter != callbackData.mTasks.end())
371 // Increase reference count.
376 callbackData.mTasks.insert({task.Get(), 1u});
381 * @brief Remove all task that were traced.
382 * @param[in] task The task want to remove trace.
383 * @param[in] taskCount The number of tasks that will be removed.
385 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
392 // Lock while removing tasks completed callback list to the queue
393 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
395 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
397 auto& callbackData = iter->second;
398 bool eraseCallbackData = false;
400 auto jter = callbackData.mTasks.find(task.Get());
402 if(jter != callbackData.mTasks.end())
404 if(jter->second <= count)
406 callbackData.mTasks.erase(jter);
408 if(callbackData.mTasks.empty())
410 eraseCallbackData = true;
412 // Move callback base into list.
413 // (To avoid task container changed during callback emit)
414 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
416 iter = mTasksCompletedCallbackList.erase(iter);
421 jter->second -= count;
425 if(!eraseCallbackData)
433 * @brief Check whether current TasksCompletedId completed or not.
434 * @param[in] id The id of tasks completed.
435 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
437 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
439 // Lock while removing tasks completed callback list to the queue
440 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
442 auto iter = mTasksCompletedCallbackList.find(id);
443 if(iter != mTasksCompletedCallbackList.end())
445 auto& callbackData = iter->second;
446 if(callbackData.mTasks.empty())
448 // Move callback base into list.
449 // (To avoid task container changed during callback emit)
450 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
452 iter = mTasksCompletedCallbackList.erase(iter);
462 * @brief Remove taskS completed callbacks by id.
463 * @param[in] id The id of taskS completed.
464 * @return True if taskS completed id removed. False otherwise.
466 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
468 // Lock while removing taskS completed callback list to the queue
469 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
471 auto iter = mTasksCompletedCallbackList.find(id);
472 if(iter == mTasksCompletedCallbackList.end())
474 // This task is already erased, or completed.
475 // Erase from completed excute callback list.
477 // Lock while removing excute callback list to the queue
478 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
480 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
482 if(iter->second == id)
484 iter = mExcuteCallbackList.erase(iter);
494 // This task is alread erased and completed. Ignore.
498 mTasksCompletedCallbackList.erase(iter);
504 * @brief Emit all completed callbacks.
505 * @note This API should be called at event thread.
507 void EmitCompletedTasks()
509 ExecuteCallbackContainer executeCallbackList;
511 // Lock while removing excute callback list to the queue
512 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
514 mEmitCompletedTaskTriggered = false;
516 // Copy callback lists, for let we execute callbacks out of mutex
517 executeCallbackList = std::move(mExcuteCallbackList);
518 mExcuteCallbackList.clear();
521 if(!executeCallbackList.empty())
523 // Execute all callbacks
524 for(auto&& callbackPair : executeCallbackList)
526 auto& callback = callbackPair.first;
527 auto id = callbackPair.second;
529 Dali::CallbackBase::Execute(*callback, id);
535 * @brief Check whether there is some completed signal what we need to trace, or not.
536 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
538 bool IsTasksCompletedCallbackExist()
540 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
541 return !mTasksCompletedCallbackList.empty();
545 * @brief Check whether there is some completed signal what we need to execute, or not.
546 * @return True if mExcuteCallbackList is not empty. False otherwise.
548 bool IsExecuteCallbackExist()
550 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
551 return !mExcuteCallbackList.empty();
555 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
557 // Lock while adding excute callback list to the queue
558 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
560 mExcuteCallbackList.emplace_back(std::move(callback), id);
562 if(!mEmitCompletedTaskTriggered)
564 mEmitCompletedTaskTriggered = true;
573 CallbackData(CallbackBase* callback)
574 : mCallback(callback),
579 CallbackData(CallbackData&& rhs) noexcept
580 : mCallback(std::move(rhs.mCallback)),
581 mTasks(std::move(rhs.mTasks))
585 CallbackData& operator=(CallbackData&& rhs) noexcept
589 mCallback = std::move(rhs.mCallback);
590 mTasks = std::move(rhs.mTasks);
597 // Delete copy operator.
598 CallbackData(const CallbackData& rhs) = delete;
599 CallbackData& operator=(const CallbackData& rhs) = delete;
602 std::unique_ptr<CallbackBase> mCallback;
603 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
607 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
608 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
610 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
612 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
613 TasksCompletedContainer mTasksCompletedCallbackList;
615 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
616 ExecuteCallbackContainer mExcuteCallbackList;
618 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
619 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
621 bool mEmitCompletedTaskTriggered : 1;
626 Dali::AsyncTaskManager AsyncTaskManager::Get()
628 if(!gAsyncTaskManager)
630 gAsyncTaskManager = Dali::AsyncTaskManager(new AsyncTaskManager());
632 return gAsyncTaskManager;
635 AsyncTaskManager::AsyncTaskManager()
636 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
637 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TaskCompleted))),
638 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get()))
642 AsyncTaskManager::~AsyncTaskManager()
647 // Remove task completed impl after all threads are join.
648 mTasksCompletedImpl.reset();
651 mWaitingTasks.clear();
652 mRunningTasks.clear();
653 mCompletedTasks.clear();
656 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
660 // Lock while adding task to the queue
661 Mutex::ScopedLock lock(mWaitingTasksMutex);
663 // push back into waiting queue.
664 mWaitingTasks.insert(mWaitingTasks.end(), task);
668 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
670 // Finish all Running threads are working
671 if(mRunningTasks.size() >= mTasks.GetElementCount())
678 size_t count = mTasks.GetElementCount();
680 while(index++ < count)
682 auto processHelperIt = mTasks.GetNext();
683 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
684 if(processHelperIt->Request())
688 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
692 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
696 uint32_t removedCount = 0u;
699 // Lock while remove task from the queue
700 Mutex::ScopedLock lock(mWaitingTasksMutex);
702 for(auto iterator = mWaitingTasks.begin(); iterator != mWaitingTasks.end();)
704 if((*iterator) == task)
706 iterator = mWaitingTasks.erase(iterator);
717 // Lock while remove task from the queue
718 Mutex::ScopedLock lock(mRunningTasksMutex);
720 for(auto iterator = mRunningTasks.begin(); iterator != mRunningTasks.end();)
722 if((*iterator).first == task)
724 // We cannot erase container. Just mark as canceled.
725 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
726 if((*iterator).second == RunningTaskState::RUNNING)
728 (*iterator).second = RunningTaskState::CANCELED;
737 // Lock while remove task from the queue
738 Mutex::ScopedLock lock(mCompletedTasksMutex);
740 for(auto iterator = mCompletedTasks.begin(); iterator != mCompletedTasks.end();)
742 if((*iterator).first == task)
744 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
748 iterator = mCompletedTasks.erase(iterator);
757 // Remove TasksCompleted callback trace
758 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
760 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
765 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
767 // mTasksCompletedImpl will take ownership of callback.
768 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
770 bool taskAdded = false; ///< Flag whether at least one task tracing now.
772 // Please be careful the order of mutex, to avoid dead lock.
774 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
776 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
778 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
780 // Collect all tasks from waiting tasks
781 for(auto& task : mWaitingTasks)
783 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
784 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
786 if((checkMask & mask) == checkMask)
789 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
793 // Collect all tasks from running tasks
794 for(auto& taskPair : mRunningTasks)
796 // Trace only if it is running now.
797 if(taskPair.second == RunningTaskState::RUNNING)
799 auto& task = taskPair.first;
800 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
801 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
803 if((checkMask & mask) == checkMask)
806 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
811 // Collect all tasks from complete tasks
812 for(auto& taskPair : mCompletedTasks)
814 // Trace only if it is need callback.
815 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
816 // If worker thread invocation, than it already remove trace at completed timing.
817 // If canceled cases, we don't append trace at running tasks already.
818 // So, we don't need to trace for SKIP_CALLBACK cases.
819 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
821 auto& task = taskPair.first;
822 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
823 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
825 if((checkMask & mask) == checkMask)
828 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
836 // If there is nothing to check task, just excute callback right now.
839 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
841 return tasksCompletedId;
844 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
846 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
849 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
851 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
853 AsyncTaskPtr nextCompletedTask = nullptr;
855 // Lock while popping task out from the queue
856 Mutex::ScopedLock lock(mCompletedTasksMutex);
858 while(!mCompletedTasks.empty())
860 auto next = mCompletedTasks.begin();
861 AsyncTaskPtr nextTask = next->first;
862 CompletedTaskState taskState = next->second;
863 mCompletedTasks.erase(next);
865 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
867 nextCompletedTask = nextTask;
871 ignoredTaskList.push_back(nextTask);
875 return nextCompletedTask;
878 void AsyncTaskManager::TaskCompleted()
880 // For UTC, let we complete only 1 task here.
881 if(AsyncTaskPtr task = PopNextCompletedTask())
883 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
885 // Remove TasksCompleted callback trace
886 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
888 mTasksCompletedImpl->RemoveTaskTrace(task);
892 mTasksCompletedImpl->EmitCompletedTasks();
895 void AsyncTaskManager::TaskAllCompleted()
897 while(AsyncTaskPtr task = PopNextCompletedTask())
899 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
901 // Remove TasksCompleted callback trace
902 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
904 mTasksCompletedImpl->RemoveTaskTrace(task);
908 mTasksCompletedImpl->EmitCompletedTasks();
911 /// Worker thread called
912 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
914 // Lock while popping task out from the queue
915 Mutex::ScopedLock lock(mWaitingTasksMutex);
917 // pop out the next task from the queue
918 AsyncTaskPtr nextTask = nullptr;
920 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
922 if((*iter)->IsReady())
928 // Lock while popping task out from the queue
929 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
931 mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
933 mWaitingTasks.erase(iter);
942 /// Worker thread called
943 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
949 bool needTrigger = false;
951 // Lock while check validation of task.
953 Mutex::ScopedLock lock(mRunningTasksMutex);
955 auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
956 if(iter != mRunningTasks.end())
958 if(iter->second == RunningTaskState::RUNNING)
960 // This task is valid.
966 // We should execute this tasks complete callback out of mutex
967 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
969 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
971 // We need to remove task trace now.
972 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
974 mTasksCompletedImpl->RemoveTaskTrace(task);
976 if(mTasksCompletedImpl->IsExecuteCallbackExist())
978 // We need to call EmitCompletedTasks(). Trigger main thread.
984 // Lock while adding task to the queue
986 Mutex::ScopedLock lock(mRunningTasksMutex);
988 auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
989 if(iter != mRunningTasks.end())
991 // Move task into completed, for ensure that AsyncTask destroy at main thread.
993 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
995 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
997 needTrigger |= callbackRequired;
999 mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1001 mRunningTasks.erase(iter);
1005 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1008 // Now, task is invalidate.
1014 // Wake up the main thread
1017 mTrigger->Trigger();
1022 // AsyncTaskManager::TaskHelper
1024 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1025 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1029 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1030 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1034 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1035 : mProcessor(std::move(processor)),
1036 mAsyncTaskManager(asyncTaskManager)
1040 bool AsyncTaskManager::TaskHelper::Request()
1042 return mProcessor->Request();
1045 } // namespace Adaptor
1047 } // namespace Internal
1049 /********************************************************************************/
1050 /********************************* PUBLIC CLASS *******************************/
1051 /********************************************************************************/
1053 AsyncTaskManager::AsyncTaskManager() = default;
1055 AsyncTaskManager::~AsyncTaskManager() = default;
1057 AsyncTaskManager AsyncTaskManager::Get()
1059 return Internal::Adaptor::AsyncTaskManager::Get();
1062 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
1064 Internal::Adaptor::GetImplementation(*this).AddTask(task);
1067 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
1069 Internal::Adaptor::GetImplementation(*this).RemoveTask(task);
1072 AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, AsyncTaskManager::CompletedCallbackTraceMask mask)
1074 return Internal::Adaptor::GetImplementation(*this).SetCompletedCallback(callback, mask);
1077 bool AsyncTaskManager::RemoveCompletedCallback(AsyncTaskManager::TasksCompletedId tasksCompletedId)
1079 return Internal::Adaptor::GetImplementation(*this).RemoveCompletedCallback(tasksCompletedId);
1082 AsyncTaskManager::AsyncTaskManager(Internal::Adaptor::AsyncTaskManager* impl)
1091 namespace AsyncTaskManager
1093 void DestroyAsyncTaskManager()
1095 Dali::Internal::Adaptor::gAsyncTaskManager.Reset();
1098 void ProcessSingleCompletedTask()
1100 auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1101 Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskCompleted();
1104 void ProcessAllCompletedTask()
1106 auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1107 Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskAllCompleted();
1109 } // namespace AsyncTaskManager