2 * Copyright (c) 2023 Samsung Electronics Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <toolkit-async-task-manager.h>
22 #include <dali/devel-api/adaptor-framework/thread-settings.h>
23 #include <dali/devel-api/threading/conditional-wait.h>
24 #include <dali/devel-api/threading/mutex.h>
25 #include <dali/devel-api/threading/thread.h>
26 #include <dali/public-api/adaptor-framework/async-task-manager.h>
27 #include <dali/public-api/adaptor-framework/round-robin-container-view.h>
28 #include <dali/public-api/common/list-wrapper.h>
29 #include <dali/public-api/object/base-object.h>
31 #include <unordered_map>
34 #include <toolkit-application.h>
35 #include <toolkit-environment-variable.h>
36 #include <toolkit-event-thread-callback.h>
37 #include "dali-test-suite-utils.h"
47 std::atomic_uint32_t gThreadId = 0u;
50 class AsyncTaskThread;
52 class AsyncTaskManager : public Dali::BaseObject
55 static Dali::AsyncTaskManager Get();
61 void AddTask(AsyncTaskPtr task);
62 void RemoveTask(AsyncTaskPtr task);
64 Dali::AsyncTaskManager::TasksCompletedId SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask);
65 bool RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId);
68 AsyncTaskPtr PopNextCompletedTask();
70 void TaskAllCompleted();
72 public: // Worker thread called method
73 AsyncTaskPtr PopNextTaskToProcess();
74 void CompleteTask(AsyncTaskPtr&& task);
77 * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
83 * @brief Create an TaskHelper.
85 * @param[in] asyncTaskManager Reference to the AsyncTaskManager
87 TaskHelper(AsyncTaskManager& asyncTaskManager);
90 * @brief Request the thread to process the task.
91 * @return True if the request succeeds, otherwise false.
96 TaskHelper(const TaskHelper&) = delete;
97 TaskHelper& operator=(const TaskHelper&) = delete;
99 TaskHelper(TaskHelper&& rhs);
100 TaskHelper& operator=(TaskHelper&& rhs) = delete;
104 * @brief Main constructor that used by all other constructors
106 TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager);
109 std::unique_ptr<AsyncTaskThread> mProcessor;
110 AsyncTaskManager& mAsyncTaskManager;
114 * @brief State of running task
116 enum class RunningTaskState
118 RUNNING = 0, ///< Running task
119 CANCELED = 1, ///< Canceled by user
123 * @brief State of complete task
125 enum class CompletedTaskState
127 REQUIRE_CALLBACK = 0, ///< Need to execute callback when completed task process.
128 SKIP_CALLBACK = 1, ///< Do not execute callback
133 AsyncTaskManager(const AsyncTaskManager& manager);
136 AsyncTaskManager& operator=(const AsyncTaskManager& manager);
139 // Keep Task as list since we take tasks by FIFO as default.
140 using AsyncTaskContainer = std::list<AsyncTaskPtr>;
142 using AsyncRunningTaskPair = std::pair<AsyncTaskPtr, RunningTaskState>;
143 using AsyncRunningTaskContainer = std::list<AsyncRunningTaskPair>;
145 using AsyncCompletedTaskPair = std::pair<AsyncTaskPtr, CompletedTaskState>;
146 using AsyncCompletedTaskContainer = std::list<AsyncCompletedTaskPair>;
148 AsyncTaskContainer mWaitingTasks; ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
149 AsyncRunningTaskContainer mRunningTasks; ///< The queue of the running tasks. Must be locked under mRunningTasksMutex.
150 AsyncCompletedTaskContainer mCompletedTasks; ///< The queue of the tasks with the async process. Must be locked under mCompletedTasksMutex.
152 RoundRobinContainerView<TaskHelper> mTasks;
154 Dali::Mutex mWaitingTasksMutex; ///< Mutex for mWaitingTasks. We can lock mRunningTasksMutex and mCompletedTasksMutex under this scope.
155 Dali::Mutex mRunningTasksMutex; ///< Mutex for mRunningTasks. We can lock mCompletedTasksMutex under this scope.
156 Dali::Mutex mCompletedTasksMutex; ///< Mutex for mCompletedTasks. We cannot lock any mutex under this scope.
158 std::unique_ptr<EventThreadCallback> mTrigger;
160 struct TasksCompletedImpl;
161 std::unique_ptr<TasksCompletedImpl> mTasksCompletedImpl; ///< TaskS completed signal interface for AsyncTaskManager.
164 inline Internal::Adaptor::AsyncTaskManager& GetImplementation(Dali::AsyncTaskManager& obj)
166 DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
168 Dali::BaseObject& handle = obj.GetBaseObject();
170 return static_cast<Internal::Adaptor::AsyncTaskManager&>(handle);
173 inline const Internal::Adaptor::AsyncTaskManager& GetImplementation(const Dali::AsyncTaskManager& obj)
175 DALI_ASSERT_ALWAYS(obj && "AsyncTaskManager is empty");
177 const Dali::BaseObject& handle = obj.GetBaseObject();
179 return static_cast<const Internal::Adaptor::AsyncTaskManager&>(handle);
182 /********************************************************************************/
183 /********************************* INTERNAL CLASS *****************************/
184 /********************************************************************************/
188 constexpr auto FORCE_TRIGGER_THRESHOLD = 128u; ///< Trigger TasksCompleted() forcely if the number of completed task contain too much.
190 constexpr auto DEFAULT_NUMBER_OF_ASYNC_THREADS = size_t{8u};
191 constexpr auto NUMBER_OF_ASYNC_THREADS_ENV = "DALI_ASYNC_MANAGER_THREAD_POOL_SIZE";
193 size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
195 auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
196 auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
197 constexpr auto MAX_NUMBER_OF_THREADS = 16u;
198 DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
199 return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
202 thread_local Dali::AsyncTaskManager gAsyncTaskManager;
206 * The worker thread for async process
208 class AsyncTaskThread : public Thread
214 AsyncTaskThread(AsyncTaskManager& asyncTaskManager)
215 : mConditionalWait(),
216 mAsyncTaskManager(asyncTaskManager),
217 mThreadId(++gThreadId),
218 mDestroyThread(false),
219 mIsThreadStarted(false),
227 ~AsyncTaskThread() override
231 ConditionalWait::ScopedLock lock(mConditionalWait);
232 mDestroyThread = true;
233 mConditionalWait.Notify(lock);
240 * @brief Request the thread to process the task.
241 * @return True if the request is successed, otherwise false.
245 if(!mIsThreadStarted)
248 mIsThreadStarted = true;
252 // Lock while adding task to the queue
253 ConditionalWait::ScopedLock lock(mConditionalWait);
257 mIsThreadIdle = false;
259 // wake up the thread
260 mConditionalWait.Notify(lock);
270 * The entry function of the worker thread.
276 snprintf(temp, 100, "AsyncTaskThread[%u]", mThreadId);
280 while(!mDestroyThread)
282 AsyncTaskPtr task = mAsyncTaskManager.PopNextTaskToProcess();
285 ConditionalWait::ScopedLock lock(mConditionalWait);
288 mIsThreadIdle = true;
289 mConditionalWait.Wait(lock);
294 tet_printf("BEGIN: AsyncTask[%s] Process\n", task->GetTaskName().data());
296 tet_printf("END: AsyncTask[%s] Process\n", task->GetTaskName().data());
299 mAsyncTaskManager.CompleteTask(std::move(task));
307 AsyncTaskThread(const AsyncTaskThread& thread) = delete;
310 AsyncTaskThread& operator=(const AsyncTaskThread& thread) = delete;
313 ConditionalWait mConditionalWait;
314 AsyncTaskManager& mAsyncTaskManager;
317 bool mIsThreadStarted;
321 // AsyncTaskManager::TasksCompletedImpl
323 struct AsyncTaskManager::TasksCompletedImpl
325 TasksCompletedImpl(AsyncTaskManager& manager, EventThreadCallback* trigger)
328 mEmitCompletedTaskTriggered(false)
334 * @brief Create new tasks completed id and.
335 * @post AppendTaskTrace or CheckTasksCompletedCallbackCompleted should be called.
336 * @param[in] callback The callback that want to be executed when we notify that all tasks completed.
338 Dali::AsyncTaskManager::TasksCompletedId GenerateTasksCompletedId(CallbackBase* callback)
340 // Lock while adding tasks completed callback list to the queue
341 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
343 auto id = mTasksCompletedCount++;
344 DALI_ASSERT_ALWAYS(mTasksCompletedCallbackList.find(id) == mTasksCompletedCallbackList.end());
346 mTasksCompletedCallbackList.insert({id, CallbackData(callback)});
351 * @brief Append task that will be trace.
352 * @post RemoveTaskTrace should be called.
353 * @param[in] id The id of tasks completed.
354 * @param[in] task The task want to trace.
356 void AppendTaskTrace(Dali::AsyncTaskManager::TasksCompletedId id, AsyncTaskPtr task)
358 // Lock while adding tasks completed callback list to the queue
359 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
361 auto iter = mTasksCompletedCallbackList.find(id);
362 if(iter == mTasksCompletedCallbackList.end())
364 // This task is already erased. Ignore.
368 auto& callbackData = iter->second;
370 auto jter = callbackData.mTasks.find(task.Get());
372 if(jter != callbackData.mTasks.end())
374 // Increase reference count.
379 callbackData.mTasks.insert({task.Get(), 1u});
384 * @brief Remove all task that were traced.
385 * @param[in] task The task want to remove trace.
386 * @param[in] taskCount The number of tasks that will be removed.
388 void RemoveTaskTrace(AsyncTaskPtr task, uint32_t count = 1u)
395 // Lock while removing tasks completed callback list to the queue
396 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
398 for(auto iter = mTasksCompletedCallbackList.begin(); iter != mTasksCompletedCallbackList.end();)
400 auto& callbackData = iter->second;
401 bool eraseCallbackData = false;
403 auto jter = callbackData.mTasks.find(task.Get());
405 if(jter != callbackData.mTasks.end())
407 if(jter->second <= count)
409 callbackData.mTasks.erase(jter);
411 if(callbackData.mTasks.empty())
413 eraseCallbackData = true;
415 // Move callback base into list.
416 // (To avoid task container changed during callback emit)
417 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
419 iter = mTasksCompletedCallbackList.erase(iter);
424 jter->second -= count;
428 if(!eraseCallbackData)
436 * @brief Check whether current TasksCompletedId completed or not.
437 * @param[in] id The id of tasks completed.
438 * @return True if all tasks are completed so we need to execute callback soon. False otherwise.
440 bool CheckTasksCompletedCallbackCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
442 // Lock while removing tasks completed callback list to the queue
443 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
445 auto iter = mTasksCompletedCallbackList.find(id);
446 if(iter != mTasksCompletedCallbackList.end())
448 auto& callbackData = iter->second;
449 if(callbackData.mTasks.empty())
451 // Move callback base into list.
452 // (To avoid task container changed during callback emit)
453 RegisterTasksCompletedCallback(std::move(callbackData.mCallback), iter->first);
455 iter = mTasksCompletedCallbackList.erase(iter);
465 * @brief Remove taskS completed callbacks by id.
466 * @param[in] id The id of taskS completed.
467 * @return True if taskS completed id removed. False otherwise.
469 bool RemoveTasksCompleted(Dali::AsyncTaskManager::TasksCompletedId id)
471 // Lock while removing taskS completed callback list to the queue
472 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
474 auto iter = mTasksCompletedCallbackList.find(id);
475 if(iter == mTasksCompletedCallbackList.end())
477 // This task is already erased, or completed.
478 // Erase from completed excute callback list.
480 // Lock while removing excute callback list to the queue
481 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
483 for(auto iter = mExcuteCallbackList.begin(); iter != mExcuteCallbackList.end();)
485 if(iter->second == id)
487 iter = mExcuteCallbackList.erase(iter);
497 // This task is alread erased and completed. Ignore.
501 mTasksCompletedCallbackList.erase(iter);
507 * @brief Emit all completed callbacks.
508 * @note This API should be called at event thread.
510 void EmitCompletedTasks()
512 ExecuteCallbackContainer executeCallbackList;
514 // Lock while removing excute callback list to the queue
515 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
517 mEmitCompletedTaskTriggered = false;
519 // Copy callback lists, for let we execute callbacks out of mutex
520 executeCallbackList = std::move(mExcuteCallbackList);
521 mExcuteCallbackList.clear();
524 if(!executeCallbackList.empty())
526 // Execute all callbacks
527 for(auto&& callbackPair : executeCallbackList)
529 auto& callback = callbackPair.first;
530 auto id = callbackPair.second;
532 Dali::CallbackBase::Execute(*callback, id);
538 * @brief Check whether there is some completed signal what we need to trace, or not.
539 * @return True if mTasksCompletedCallbackList is not empty. False otherwise.
541 bool IsTasksCompletedCallbackExist()
543 Mutex::ScopedLock lock(mTasksCompletedCallbacksMutex);
544 return !mTasksCompletedCallbackList.empty();
548 * @brief Check whether there is some completed signal what we need to execute, or not.
549 * @return True if mExcuteCallbackList is not empty. False otherwise.
551 bool IsExecuteCallbackExist()
553 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
554 return !mExcuteCallbackList.empty();
558 void RegisterTasksCompletedCallback(std::unique_ptr<CallbackBase> callback, Dali::AsyncTaskManager::TasksCompletedId id)
560 // Lock while adding excute callback list to the queue
561 Mutex::ScopedLock lock(mExcuteCallbacksMutex);
563 mExcuteCallbackList.emplace_back(std::move(callback), id);
565 if(!mEmitCompletedTaskTriggered)
567 mEmitCompletedTaskTriggered = true;
576 CallbackData(CallbackBase* callback)
577 : mCallback(callback),
582 CallbackData(CallbackData&& rhs) noexcept
583 : mCallback(std::move(rhs.mCallback)),
584 mTasks(std::move(rhs.mTasks))
588 CallbackData& operator=(CallbackData&& rhs) noexcept
592 mCallback = std::move(rhs.mCallback);
593 mTasks = std::move(rhs.mTasks);
600 // Delete copy operator.
601 CallbackData(const CallbackData& rhs) = delete;
602 CallbackData& operator=(const CallbackData& rhs) = delete;
605 std::unique_ptr<CallbackBase> mCallback;
606 std::unordered_map<const AsyncTask*, uint32_t> mTasks;
610 AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
611 EventThreadCallback* mTrigger; ///< EventThread callback trigger. (Not owned.)
613 Dali::AsyncTaskManager::TasksCompletedId mTasksCompletedCount{0u};
615 using TasksCompletedContainer = std::unordered_map<Dali::AsyncTaskManager::TasksCompletedId, CallbackData>;
616 TasksCompletedContainer mTasksCompletedCallbackList;
618 using ExecuteCallbackContainer = std::vector<std::pair<std::unique_ptr<CallbackBase>, Dali::AsyncTaskManager::TasksCompletedId>>;
619 ExecuteCallbackContainer mExcuteCallbackList;
621 Dali::Mutex mTasksCompletedCallbacksMutex; ///< Mutex for mTasksCompletedCallbackList. We can lock mExcuteCallbacksMutex under this scope.
622 Dali::Mutex mExcuteCallbacksMutex; ///< Mutex for mExcuteCallbackList.
624 bool mEmitCompletedTaskTriggered : 1;
629 Dali::AsyncTaskManager AsyncTaskManager::Get()
631 if(!gAsyncTaskManager)
633 gAsyncTaskManager = Dali::AsyncTaskManager(new AsyncTaskManager());
635 return gAsyncTaskManager;
638 AsyncTaskManager::AsyncTaskManager()
639 : mTasks(GetNumberOfThreads(NUMBER_OF_ASYNC_THREADS_ENV, DEFAULT_NUMBER_OF_ASYNC_THREADS), [&]() { return TaskHelper(*this); }),
640 mTrigger(new EventThreadCallback(MakeCallback(this, &AsyncTaskManager::TaskCompleted))),
641 mTasksCompletedImpl(new TasksCompletedImpl(*this, mTrigger.get()))
645 AsyncTaskManager::~AsyncTaskManager()
650 // Remove task completed impl after all threads are join.
651 mTasksCompletedImpl.reset();
654 mWaitingTasks.clear();
655 mRunningTasks.clear();
656 mCompletedTasks.clear();
659 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
663 // Lock while adding task to the queue
664 Mutex::ScopedLock lock(mWaitingTasksMutex);
666 // push back into waiting queue.
667 mWaitingTasks.insert(mWaitingTasks.end(), task);
671 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
673 // Finish all Running threads are working
674 if(mRunningTasks.size() >= mTasks.GetElementCount())
681 size_t count = mTasks.GetElementCount();
683 while(index++ < count)
685 auto processHelperIt = mTasks.GetNext();
686 DALI_ASSERT_ALWAYS(processHelperIt != mTasks.End());
687 if(processHelperIt->Request())
691 // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
695 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
699 uint32_t removedCount = 0u;
702 // Lock while remove task from the queue
703 Mutex::ScopedLock lock(mWaitingTasksMutex);
705 for(auto iterator = mWaitingTasks.begin(); iterator != mWaitingTasks.end();)
707 if((*iterator) == task)
709 iterator = mWaitingTasks.erase(iterator);
720 // Lock while remove task from the queue
721 Mutex::ScopedLock lock(mRunningTasksMutex);
723 for(auto iterator = mRunningTasks.begin(); iterator != mRunningTasks.end();)
725 if((*iterator).first == task)
727 // We cannot erase container. Just mark as canceled.
728 // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
729 if((*iterator).second == RunningTaskState::RUNNING)
731 (*iterator).second = RunningTaskState::CANCELED;
740 // Lock while remove task from the queue
741 Mutex::ScopedLock lock(mCompletedTasksMutex);
743 for(auto iterator = mCompletedTasks.begin(); iterator != mCompletedTasks.end();)
745 if((*iterator).first == task)
747 if((*iterator).second == CompletedTaskState::REQUIRE_CALLBACK)
751 iterator = mCompletedTasks.erase(iterator);
760 // Remove TasksCompleted callback trace
761 if(removedCount > 0u && mTasksCompletedImpl->IsTasksCompletedCallbackExist())
763 mTasksCompletedImpl->RemoveTaskTrace(task, removedCount);
768 Dali::AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, Dali::AsyncTaskManager::CompletedCallbackTraceMask mask)
770 // mTasksCompletedImpl will take ownership of callback.
771 Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId = mTasksCompletedImpl->GenerateTasksCompletedId(callback);
773 bool taskAdded = false; ///< Flag whether at least one task tracing now.
775 // Please be careful the order of mutex, to avoid dead lock.
777 Mutex::ScopedLock lockWait(mWaitingTasksMutex);
779 Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
781 Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
783 // Collect all tasks from waiting tasks
784 for(auto& task : mWaitingTasks)
786 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
787 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
789 if((checkMask & mask) == checkMask)
792 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
796 // Collect all tasks from running tasks
797 for(auto& taskPair : mRunningTasks)
799 // Trace only if it is running now.
800 if(taskPair.second == RunningTaskState::RUNNING)
802 auto& task = taskPair.first;
803 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
804 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
806 if((checkMask & mask) == checkMask)
809 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
814 // Collect all tasks from complete tasks
815 for(auto& taskPair : mCompletedTasks)
817 // Trace only if it is need callback.
818 // Note : There are two CompletedTaskState::SKIP_CALLBACK cases, worker thread invocation and canceled cases.
819 // If worker thread invocation, than it already remove trace at completed timing.
820 // If canceled cases, we don't append trace at running tasks already.
821 // So, we don't need to trace for SKIP_CALLBACK cases.
822 if(taskPair.second == CompletedTaskState::REQUIRE_CALLBACK)
824 auto& task = taskPair.first;
825 auto checkMask = (task->GetCallbackInvocationThread() == Dali::AsyncTask::ThreadType::MAIN_THREAD ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_MAIN : Dali::AsyncTaskManager::CompletedCallbackTraceMask::THREAD_MASK_WORKER) |
826 (task->GetPriorityType() == Dali::AsyncTask::PriorityType::HIGH ? Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_HIGH : Dali::AsyncTaskManager::CompletedCallbackTraceMask::PRIORITY_MASK_LOW);
828 if((checkMask & mask) == checkMask)
831 mTasksCompletedImpl->AppendTaskTrace(tasksCompletedId, task);
839 // If there is nothing to check task, just excute callback right now.
842 mTasksCompletedImpl->CheckTasksCompletedCallbackCompleted(tasksCompletedId);
844 return tasksCompletedId;
847 bool AsyncTaskManager::RemoveCompletedCallback(Dali::AsyncTaskManager::TasksCompletedId tasksCompletedId)
849 return mTasksCompletedImpl->RemoveTasksCompleted(tasksCompletedId);
852 AsyncTaskPtr AsyncTaskManager::PopNextCompletedTask()
854 std::vector<AsyncTaskPtr> ignoredTaskList; ///< To keep asyncTask reference so we can ensure that destructor called out of mutex.
856 AsyncTaskPtr nextCompletedTask = nullptr;
858 // Lock while popping task out from the queue
859 Mutex::ScopedLock lock(mCompletedTasksMutex);
861 while(!mCompletedTasks.empty())
863 auto next = mCompletedTasks.begin();
864 AsyncTaskPtr nextTask = next->first;
865 CompletedTaskState taskState = next->second;
866 mCompletedTasks.erase(next);
868 if(taskState == CompletedTaskState::REQUIRE_CALLBACK)
870 nextCompletedTask = nextTask;
874 ignoredTaskList.push_back(nextTask);
878 return nextCompletedTask;
881 void AsyncTaskManager::TaskCompleted()
883 // For UTC, let we complete only 1 task here.
884 if(AsyncTaskPtr task = PopNextCompletedTask())
886 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
888 // Remove TasksCompleted callback trace
889 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
891 mTasksCompletedImpl->RemoveTaskTrace(task);
895 mTasksCompletedImpl->EmitCompletedTasks();
898 void AsyncTaskManager::TaskAllCompleted()
900 while(AsyncTaskPtr task = PopNextCompletedTask())
902 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
904 // Remove TasksCompleted callback trace
905 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
907 mTasksCompletedImpl->RemoveTaskTrace(task);
911 mTasksCompletedImpl->EmitCompletedTasks();
914 /// Worker thread called
915 AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
917 // Lock while popping task out from the queue
918 Mutex::ScopedLock lock(mWaitingTasksMutex);
920 // pop out the next task from the queue
921 AsyncTaskPtr nextTask = nullptr;
923 for(auto iter = mWaitingTasks.begin(), endIter = mWaitingTasks.end(); iter != endIter; ++iter)
925 if((*iter)->IsReady())
931 // Lock while popping task out from the queue
932 Mutex::ScopedLock lock(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
934 mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
936 mWaitingTasks.erase(iter);
945 /// Worker thread called
946 void AsyncTaskManager::CompleteTask(AsyncTaskPtr&& task)
952 bool needTrigger = false;
954 // Lock while check validation of task.
956 Mutex::ScopedLock lock(mRunningTasksMutex);
958 auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
959 if(iter != mRunningTasks.end())
961 if(iter->second == RunningTaskState::RUNNING)
963 // This task is valid.
969 // We should execute this tasks complete callback out of mutex
970 if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD)
972 CallbackBase::Execute(*(task->GetCompletedCallback()), task);
974 // We need to remove task trace now.
975 if(mTasksCompletedImpl->IsTasksCompletedCallbackExist())
977 mTasksCompletedImpl->RemoveTaskTrace(task);
979 if(mTasksCompletedImpl->IsExecuteCallbackExist())
981 // We need to call EmitCompletedTasks(). Trigger main thread.
987 // Lock while adding task to the queue
989 Mutex::ScopedLock lock(mRunningTasksMutex);
991 auto iter = std::find_if(mRunningTasks.begin(), mRunningTasks.end(), [task](const AsyncRunningTaskPair& element) { return element.first == task; });
992 if(iter != mRunningTasks.end())
994 // Move task into completed, for ensure that AsyncTask destroy at main thread.
996 Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
998 const bool callbackRequired = notify && (task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD);
1000 needTrigger |= callbackRequired;
1002 mCompletedTasks.insert(mCompletedTasks.end(), std::make_pair(task, callbackRequired ? CompletedTaskState::REQUIRE_CALLBACK : CompletedTaskState::SKIP_CALLBACK));
1004 mRunningTasks.erase(iter);
1008 needTrigger |= (mCompletedTasks.size() >= FORCE_TRIGGER_THRESHOLD);
1011 // Now, task is invalidate.
1017 // Wake up the main thread
1020 mTrigger->Trigger();
1025 // AsyncTaskManager::TaskHelper
1027 AsyncTaskManager::TaskHelper::TaskHelper(AsyncTaskManager& asyncTaskManager)
1028 : TaskHelper(std::unique_ptr<AsyncTaskThread>(new AsyncTaskThread(asyncTaskManager)), asyncTaskManager)
1032 AsyncTaskManager::TaskHelper::TaskHelper(TaskHelper&& rhs)
1033 : TaskHelper(std::move(rhs.mProcessor), rhs.mAsyncTaskManager)
1037 AsyncTaskManager::TaskHelper::TaskHelper(std::unique_ptr<AsyncTaskThread> processor, AsyncTaskManager& asyncTaskManager)
1038 : mProcessor(std::move(processor)),
1039 mAsyncTaskManager(asyncTaskManager)
1043 bool AsyncTaskManager::TaskHelper::Request()
1045 return mProcessor->Request();
1048 } // namespace Adaptor
1050 } // namespace Internal
1052 /********************************************************************************/
1053 /********************************* PUBLIC CLASS *******************************/
1054 /********************************************************************************/
1056 AsyncTaskManager::AsyncTaskManager() = default;
1058 AsyncTaskManager::~AsyncTaskManager() = default;
1060 AsyncTaskManager AsyncTaskManager::Get()
1062 return Internal::Adaptor::AsyncTaskManager::Get();
1065 void AsyncTaskManager::AddTask(AsyncTaskPtr task)
1067 Internal::Adaptor::GetImplementation(*this).AddTask(task);
1070 void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
1072 Internal::Adaptor::GetImplementation(*this).RemoveTask(task);
1075 AsyncTaskManager::TasksCompletedId AsyncTaskManager::SetCompletedCallback(CallbackBase* callback, AsyncTaskManager::CompletedCallbackTraceMask mask)
1077 return Internal::Adaptor::GetImplementation(*this).SetCompletedCallback(callback, mask);
1080 bool AsyncTaskManager::RemoveCompletedCallback(AsyncTaskManager::TasksCompletedId tasksCompletedId)
1082 return Internal::Adaptor::GetImplementation(*this).RemoveCompletedCallback(tasksCompletedId);
1085 AsyncTaskManager::AsyncTaskManager(Internal::Adaptor::AsyncTaskManager* impl)
1094 namespace AsyncTaskManager
1096 void DestroyAsyncTaskManager()
1098 Dali::Internal::Adaptor::gAsyncTaskManager.Reset();
1101 void ProcessSingleCompletedTask()
1103 auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1104 Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskCompleted();
1107 void ProcessAllCompletedTask()
1109 auto asyncTaskManager = Dali::AsyncTaskManager::Get();
1110 Dali::Internal::Adaptor::GetImplementation(asyncTaskManager).TaskAllCompleted();
1112 } // namespace AsyncTaskManager