Make enum for RunningTaskState + Clean code 15/290615/7
authorEunki, Hong <eunkiki.hong@samsung.com>
Wed, 29 Mar 2023 11:30:01 +0000 (20:30 +0900)
committerEunki Hong <eunkiki.hong@samsung.com>
Sat, 1 Apr 2023 03:26:42 +0000 (12:26 +0900)
The running task's second value was not cleared means. Let we use enum value
instead of single boolean.

And also, let we excute Worker thread provider only if it is valid

Change-Id: Ic6af332774fe008fd010c11510d68462762c0e34
Signed-off-by: Eunki, Hong <eunkiki.hong@samsung.com>
dali/internal/system/common/async-task-manager-impl.cpp
dali/internal/system/common/async-task-manager-impl.h

index 16f44baa3b3a86bf10806c2e4fa38476d38c2c21..eeb20a644acfb3606a5997fead87bcb976ad7c6a 100644 (file)
@@ -24,7 +24,6 @@
 #include <dali/devel-api/common/singleton-service.h>
 #include <dali/integration-api/adaptor-framework/adaptor.h>
 #include <dali/integration-api/debug.h>
-#include <dali/public-api/common/vector-wrapper.h>
 
 #include <unordered_map>
 
@@ -48,8 +47,8 @@ size_t GetNumberOfThreads(const char* environmentVariable, size_t defaultValue)
   auto           numberString          = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
   auto           numberOfThreads       = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
   constexpr auto MAX_NUMBER_OF_THREADS = 16u;
-  DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
-  return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
+  DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
+  return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
 }
 
 size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
@@ -177,7 +176,8 @@ public:
   template<typename CacheContainer, typename Iterator>
   static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
   {
-    cacheMap[task.Get()].push_back(iterator);
+    auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
+    cacheContainer.insert(cacheContainer.end(), iterator);
   }
 
   /**
@@ -221,10 +221,9 @@ public:
 public:
   AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
 
-  // TODO : Can't we use std::set instead of std::vector?
-  // It will be slowdown If someone AddTask multiple times for same task.
-  using TaskCacheContainer        = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
-  using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+  // Keep cache iterators as list since we take tasks by FIFO as default.
+  using TaskCacheContainer        = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
+  using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
 
   TaskCacheContainer        mWaitingTasksCache;   ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
   RunningTaskCacheContainer mRunningTasksCache;   ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
@@ -272,6 +271,7 @@ AsyncTaskManager::~AsyncTaskManager()
 {
   if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
   {
+    mProcessorRegistered = false;
     Dali::Adaptor::Get().UnregisterProcessor(*this);
   }
 
@@ -322,6 +322,7 @@ void AsyncTaskManager::AddTask(AsyncTaskPtr task)
     // If all threads are busy, then it's ok just to push the task because they will try to get the next job.
   }
 
+  // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
   if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
   {
     Dali::Adaptor::Get().RegisterProcessor(*this);
@@ -338,7 +339,7 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
 
     // Check whether we need to unregister processor.
-    // If there is some non-empty queue exist, we don't need to check unregister.
+    // If there is some non-empty queue exist, we don't need to unregister processor.
     bool needCheckUnregisterProcessor = true;
 
     {
@@ -350,6 +351,7 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       {
         for(auto iterator : mapIter->second)
         {
+          DALI_ASSERT_DEBUG((*iterator) == task);
           if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
           {
             // Decrease the number of waiting tasks for high priority.
@@ -375,11 +377,11 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       {
         for(auto iterator : mapIter->second)
         {
-          // We cannot erase container. Just mark as erased.
+          DALI_ASSERT_DEBUG((*iterator).first == task);
+          // We cannot erase container. Just mark as canceled.
           // Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
-          iterator->second = true;
+          (*iterator).second = RunningTaskState::CANCELED;
         }
-        CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
       }
 
       if(!mRunningTasks.empty())
@@ -397,6 +399,7 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       {
         for(auto iterator : mapIter->second)
         {
+          DALI_ASSERT_DEBUG((*iterator) == task);
           mCompletedTasks.erase(iterator);
         }
         CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
@@ -408,7 +411,7 @@ void AsyncTaskManager::RemoveTask(AsyncTaskPtr task)
       }
     }
 
-    // UnregisterProcessor required to lock mutex. Call that API only if required.
+    // UnregisterProcessor required to lock mutex. Call this API only if required.
     if(needCheckUnregisterProcessor)
     {
       UnregisterProcessor();
@@ -446,17 +449,17 @@ void AsyncTaskManager::UnregisterProcessor()
     // Keep processor at least 1 task exist.
     // Please be careful the order of mutex, to avoid dead lock.
     // TODO : Should we lock all mutex rightnow?
-    Mutex::ScopedLock lock1(mWaitingTasksMutex);
+    Mutex::ScopedLock lockWait(mWaitingTasksMutex);
     if(mWaitingTasks.empty())
     {
-      Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+      Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
       if(mRunningTasks.empty())
       {
-        Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+        Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
         if(mCompletedTasks.empty())
         {
-          Dali::Adaptor::Get().UnregisterProcessor(*this);
           mProcessorRegistered = false;
+          Dali::Adaptor::Get().UnregisterProcessor(*this);
         }
       }
     }
@@ -472,9 +475,9 @@ void AsyncTaskManager::TasksCompleted()
     DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
     CallbackBase::Execute(*(task->GetCompletedCallback()), task);
   }
-  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
 
   UnregisterProcessor();
+  DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
 }
 
 void AsyncTaskManager::Process(bool postProcessor)
@@ -531,7 +534,7 @@ AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
 
           DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
 
-          auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+          auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
           CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
 
           // Decrease avaliable task counts if it is low priority
@@ -563,57 +566,66 @@ AsyncTaskPtr AsyncTaskManager::PopNextTaskToProcess()
 /// Worker thread called
 void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
 {
+  bool notify = false;
+
   // Lock while adding task to the queue
+  if(task)
   {
     Mutex::ScopedLock lock(mRunningTasksMutex);
 
-    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
-
-    // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
-    for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+    auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+    if(mapIter != mCacheImpl->mRunningTasksCache.end())
     {
-      if((*iter).first == task)
+      const auto cacheIter = mapIter->second.begin();
+      DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+      const auto iter = *cacheIter;
+      DALI_ASSERT_DEBUG(iter->first == task);
+      if(iter->second == RunningTaskState::RUNNING)
       {
-        if(!(*iter).second)
-        {
-          if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
-          {
-            Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+        // This task is valid.
+        notify = true;
+      }
 
-            DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+      const auto priorityType = iter->first->GetPriorityType();
+      // Increase avaliable task counts if it is low priority
+      if(priorityType == AsyncTask::PriorityType::LOW)
+      {
+        // We are under running task mutex. We can increase it.
+        ++mAvaliableLowPriorityTaskCounts;
+      }
+      CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+      mRunningTasks.erase(iter);
+    }
 
-            auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
-            CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
-          }
+    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
 
-          CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
-        }
+    // We should move the task to compeleted task under mRunningTaskMutex.
+    if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+    {
+      Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
 
-        // Increase avaliable task counts if it is low priority
-        const auto priorityType = task->GetPriorityType();
-        if(priorityType == AsyncTask::PriorityType::LOW)
-        {
-          // We are under running task mutex. We can increase it.
-          ++mAvaliableLowPriorityTaskCounts;
-        }
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
 
-        // Delete this task in running queue
-        mRunningTasks.erase(iter);
-        break;
-      }
+      auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+      CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
     }
   }
 
-  // wake up the main thread
-  if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+  // We should execute this tasks complete callback out of mutex
+  if(notify)
   {
-    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
-    mTrigger->Trigger();
-  }
-  else
-  {
-    DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
-    CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+    if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+    {
+      // wake up the main thread
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
+      mTrigger->Trigger();
+    }
+    else // task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD
+    {
+      DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
+      CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+    }
   }
 }
 
index c33f27551c1725a77cac3b2baa9780ee2330b408..8a67d53b9db28a50448c052e96c3c7a9a5b4310b 100644 (file)
@@ -134,11 +134,6 @@ public:
    */
   void TasksCompleted();
 
-  /**
-   * @copydoc Dali::Integration::Processor::Process()
-   */
-  void Process(bool postProcessor) override;
-
 public: // Worker thread called method
   /**
    * Pop the next task out from the queue.
@@ -154,6 +149,12 @@ public: // Worker thread called method
    */
   void CompleteTask(AsyncTaskPtr task);
 
+protected: // Implementation of Processor
+  /**
+   * @copydoc Dali::Integration::Processor::Process()
+   */
+  void Process(bool postProcessor) override;
+
 private:
   /**
    * @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
@@ -192,6 +193,15 @@ private:
     AsyncTaskManager&                mAsyncTaskManager;
   };
 
+  /**
+   * @brief State of running task
+   */
+  enum RunningTaskState
+  {
+    RUNNING  = 0, ///< Running task
+    CANCELED = 1, ///< Canceled by user
+  };
+
 private:
   // Undefined
   AsyncTaskManager(const AsyncTaskManager& manager);
@@ -200,9 +210,10 @@ private:
   AsyncTaskManager& operator=(const AsyncTaskManager& manager);
 
 private:
+  // Keep Task as list since we take tasks by FIFO as default.
   using AsyncTaskContainer = std::list<AsyncTaskPtr>;
 
-  using AsyncTaskPair             = std::pair<AsyncTaskPtr, bool>;
+  using AsyncTaskPair             = std::pair<AsyncTaskPtr, RunningTaskState>;
   using AsyncRunningTaskContainer = std::list<AsyncTaskPair>;
 
   AsyncTaskContainer        mWaitingTasks;   ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
@@ -226,7 +237,8 @@ private:
   std::unique_ptr<CacheImpl> mCacheImpl; ///< Cache interface for AsyncTaskManager.
 
   std::unique_ptr<EventThreadCallback> mTrigger;
-  bool                                 mProcessorRegistered;
+
+  bool mProcessorRegistered : 1;
 };
 
 } // namespace Adaptor