The running task's second value was not cleared means. Let we use enum value
instead of single boolean.
And also, let we excute Worker thread provider only if it is valid
Change-Id: Ic6af332774fe008fd010c11510d68462762c0e34
Signed-off-by: Eunki, Hong <eunkiki.hong@samsung.com>
#include <dali/devel-api/common/singleton-service.h>
#include <dali/integration-api/adaptor-framework/adaptor.h>
#include <dali/integration-api/debug.h>
#include <dali/devel-api/common/singleton-service.h>
#include <dali/integration-api/adaptor-framework/adaptor.h>
#include <dali/integration-api/debug.h>
-#include <dali/public-api/common/vector-wrapper.h>
auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
constexpr auto MAX_NUMBER_OF_THREADS = 16u;
auto numberString = EnvironmentVariable::GetEnvironmentVariable(environmentVariable);
auto numberOfThreads = numberString ? std::strtoul(numberString, nullptr, 10) : 0;
constexpr auto MAX_NUMBER_OF_THREADS = 16u;
- DALI_ASSERT_DEBUG(numberOfThreads < MAX_NUMBER_OF_THREADS);
- return (numberOfThreads > 0 && numberOfThreads < MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
+ DALI_ASSERT_DEBUG(numberOfThreads <= MAX_NUMBER_OF_THREADS);
+ return (numberOfThreads > 0 && numberOfThreads <= MAX_NUMBER_OF_THREADS) ? numberOfThreads : defaultValue;
}
size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
}
size_t GetNumberOfLowPriorityThreads(const char* environmentVariable, size_t defaultValue, size_t maxValue)
template<typename CacheContainer, typename Iterator>
static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
{
template<typename CacheContainer, typename Iterator>
static void InsertTaskCache(CacheContainer& cacheMap, AsyncTaskPtr task, Iterator iterator)
{
- cacheMap[task.Get()].push_back(iterator);
+ auto& cacheContainer = cacheMap[task.Get()]; // Get or Create cache container.
+ cacheContainer.insert(cacheContainer.end(), iterator);
public:
AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
public:
AsyncTaskManager& mManager; ///< Owner of this CacheImpl.
- // TODO : Can't we use std::set instead of std::vector?
- // It will be slowdown If someone AddTask multiple times for same task.
- using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncTaskContainer::iterator>>;
- using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::vector<AsyncRunningTaskContainer::iterator>>;
+ // Keep cache iterators as list since we take tasks by FIFO as default.
+ using TaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncTaskContainer::iterator>>;
+ using RunningTaskCacheContainer = std::unordered_map<const AsyncTask*, std::list<AsyncRunningTaskContainer::iterator>>;
TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
TaskCacheContainer mWaitingTasksCache; ///< The cache of tasks and iterator for waiting to async process. Must be locked under mWaitingTasksMutex.
RunningTaskCacheContainer mRunningTasksCache; ///< The cache of tasks and iterator for running tasks. Must be locked under mRunningTasksMutex.
{
if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
{
if(mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
+ mProcessorRegistered = false;
Dali::Adaptor::Get().UnregisterProcessor(*this);
}
Dali::Adaptor::Get().UnregisterProcessor(*this);
}
// If all threads are busy, then it's ok just to push the task because they will try to get the next job.
}
// If all threads are busy, then it's ok just to push the task because they will try to get the next job.
}
+ // Register Process (Since mTrigger execute too late timing if event thread running a lots of events.)
if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
Dali::Adaptor::Get().RegisterProcessor(*this);
if(!mProcessorRegistered && Dali::Adaptor::IsAvailable())
{
Dali::Adaptor::Get().RegisterProcessor(*this);
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
// Check whether we need to unregister processor.
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "RemoveTask [%p]\n", task.Get());
// Check whether we need to unregister processor.
- // If there is some non-empty queue exist, we don't need to check unregister.
+ // If there is some non-empty queue exist, we don't need to unregister processor.
bool needCheckUnregisterProcessor = true;
{
bool needCheckUnregisterProcessor = true;
{
{
for(auto iterator : mapIter->second)
{
{
for(auto iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG((*iterator) == task);
if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
{
// Decrease the number of waiting tasks for high priority.
if((*iterator)->GetPriorityType() == AsyncTask::PriorityType::HIGH)
{
// Decrease the number of waiting tasks for high priority.
{
for(auto iterator : mapIter->second)
{
{
for(auto iterator : mapIter->second)
{
- // We cannot erase container. Just mark as erased.
+ DALI_ASSERT_DEBUG((*iterator).first == task);
+ // We cannot erase container. Just mark as canceled.
// Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
// Note : mAvaliableLowPriorityTaskCounts will be increased after process finished.
- iterator->second = true;
+ (*iterator).second = RunningTaskState::CANCELED;
- CacheImpl::EraseAllTaskCache(mCacheImpl->mRunningTasksCache, task);
}
if(!mRunningTasks.empty())
}
if(!mRunningTasks.empty())
{
for(auto iterator : mapIter->second)
{
{
for(auto iterator : mapIter->second)
{
+ DALI_ASSERT_DEBUG((*iterator) == task);
mCompletedTasks.erase(iterator);
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
mCompletedTasks.erase(iterator);
}
CacheImpl::EraseAllTaskCache(mCacheImpl->mCompletedTasksCache, task);
- // UnregisterProcessor required to lock mutex. Call that API only if required.
+ // UnregisterProcessor required to lock mutex. Call this API only if required.
if(needCheckUnregisterProcessor)
{
UnregisterProcessor();
if(needCheckUnregisterProcessor)
{
UnregisterProcessor();
// Keep processor at least 1 task exist.
// Please be careful the order of mutex, to avoid dead lock.
// TODO : Should we lock all mutex rightnow?
// Keep processor at least 1 task exist.
// Please be careful the order of mutex, to avoid dead lock.
// TODO : Should we lock all mutex rightnow?
- Mutex::ScopedLock lock1(mWaitingTasksMutex);
+ Mutex::ScopedLock lockWait(mWaitingTasksMutex);
if(mWaitingTasks.empty())
{
if(mWaitingTasks.empty())
{
- Mutex::ScopedLock lock2(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
+ Mutex::ScopedLock lockRunning(mRunningTasksMutex); // We can lock this mutex under mWaitingTasksMutex.
if(mRunningTasks.empty())
{
if(mRunningTasks.empty())
{
- Mutex::ScopedLock lock3(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
+ Mutex::ScopedLock lockComplete(mCompletedTasksMutex); // We can lock this mutex under mWaitingTasksMutex and mRunningTasksMutex.
if(mCompletedTasks.empty())
{
if(mCompletedTasks.empty())
{
- Dali::Adaptor::Get().UnregisterProcessor(*this);
mProcessorRegistered = false;
mProcessorRegistered = false;
+ Dali::Adaptor::Get().UnregisterProcessor(*this);
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback [%p]\n", task.Get());
CallbackBase::Execute(*(task->GetCompletedCallback()), task);
}
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "TasksCompleted end\n");
}
void AsyncTaskManager::Process(bool postProcessor)
}
void AsyncTaskManager::Process(bool postProcessor)
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Waiting -> Running [%p]\n", nextTask.Get());
- auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, false));
+ auto runningIter = mRunningTasks.insert(mRunningTasks.end(), std::make_pair(nextTask, RunningTaskState::RUNNING));
CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
// Decrease avaliable task counts if it is low priority
CacheImpl::InsertTaskCache(mCacheImpl->mRunningTasksCache, nextTask, runningIter);
// Decrease avaliable task counts if it is low priority
/// Worker thread called
void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
{
/// Worker thread called
void AsyncTaskManager::CompleteTask(AsyncTaskPtr task)
{
// Lock while adding task to the queue
// Lock while adding task to the queue
{
Mutex::ScopedLock lock(mRunningTasksMutex);
{
Mutex::ScopedLock lock(mRunningTasksMutex);
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p]\n", task.Get());
-
- // Note : The number of mRunningTasks size will not be over than thread count. Just linear iterate.
- for(auto iter = mRunningTasks.begin(), endIter = mRunningTasks.end(); iter != endIter; ++iter)
+ auto mapIter = mCacheImpl->mRunningTasksCache.find(task.Get());
+ if(mapIter != mCacheImpl->mRunningTasksCache.end())
- if((*iter).first == task)
+ const auto cacheIter = mapIter->second.begin();
+ DALI_ASSERT_ALWAYS(cacheIter != mapIter->second.end());
+
+ const auto iter = *cacheIter;
+ DALI_ASSERT_DEBUG(iter->first == task);
+ if(iter->second == RunningTaskState::RUNNING)
- if(!(*iter).second)
- {
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
- {
- Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
+ // This task is valid.
+ notify = true;
+ }
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
+ const auto priorityType = iter->first->GetPriorityType();
+ // Increase avaliable task counts if it is low priority
+ if(priorityType == AsyncTask::PriorityType::LOW)
+ {
+ // We are under running task mutex. We can increase it.
+ ++mAvaliableLowPriorityTaskCounts;
+ }
+ CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
+ mRunningTasks.erase(iter);
+ }
- auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
- CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "CompleteTask [%p] (is notify? : %d)\n", task.Get(), notify);
- CacheImpl::EraseTaskCache(mCacheImpl->mRunningTasksCache, task, iter);
- }
+ // We should move the task to compeleted task under mRunningTaskMutex.
+ if(notify && task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ {
+ Mutex::ScopedLock lock(mCompletedTasksMutex); // We can lock this mutex under mRunningTasksMutex.
- // Increase avaliable task counts if it is low priority
- const auto priorityType = task->GetPriorityType();
- if(priorityType == AsyncTask::PriorityType::LOW)
- {
- // We are under running task mutex. We can increase it.
- ++mAvaliableLowPriorityTaskCounts;
- }
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Running -> Completed [%p]\n", task.Get());
- // Delete this task in running queue
- mRunningTasks.erase(iter);
- break;
- }
+ auto completedIter = mCompletedTasks.insert(mCompletedTasks.end(), task);
+ CacheImpl::InsertTaskCache(mCacheImpl->mCompletedTasksCache, task, completedIter);
- // wake up the main thread
- if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ // We should execute this tasks complete callback out of mutex
+ if(notify)
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
- mTrigger->Trigger();
- }
- else
- {
- DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
- CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ if(task->GetCallbackInvocationThread() == AsyncTask::ThreadType::MAIN_THREAD)
+ {
+ // wake up the main thread
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Trigger main thread\n");
+ mTrigger->Trigger();
+ }
+ else // task->GetCallbackInvocationThread() == AsyncTask::ThreadType::WORKER_THREAD
+ {
+ DALI_LOG_INFO(gAsyncTasksManagerLogFilter, Debug::Verbose, "Execute callback on worker thread [%p]\n", task.Get());
+ CallbackBase::Execute(*(task->GetCompletedCallback()), task);
+ }
*/
void TasksCompleted();
*/
void TasksCompleted();
- /**
- * @copydoc Dali::Integration::Processor::Process()
- */
- void Process(bool postProcessor) override;
-
public: // Worker thread called method
/**
* Pop the next task out from the queue.
public: // Worker thread called method
/**
* Pop the next task out from the queue.
*/
void CompleteTask(AsyncTaskPtr task);
*/
void CompleteTask(AsyncTaskPtr task);
+protected: // Implementation of Processor
+ /**
+ * @copydoc Dali::Integration::Processor::Process()
+ */
+ void Process(bool postProcessor) override;
+
private:
/**
* @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
private:
/**
* @brief Helper class to keep the relation between AsyncTaskThread and corresponding container
AsyncTaskManager& mAsyncTaskManager;
};
AsyncTaskManager& mAsyncTaskManager;
};
+ /**
+ * @brief State of running task
+ */
+ enum RunningTaskState
+ {
+ RUNNING = 0, ///< Running task
+ CANCELED = 1, ///< Canceled by user
+ };
+
private:
// Undefined
AsyncTaskManager(const AsyncTaskManager& manager);
private:
// Undefined
AsyncTaskManager(const AsyncTaskManager& manager);
AsyncTaskManager& operator=(const AsyncTaskManager& manager);
private:
AsyncTaskManager& operator=(const AsyncTaskManager& manager);
private:
+ // Keep Task as list since we take tasks by FIFO as default.
using AsyncTaskContainer = std::list<AsyncTaskPtr>;
using AsyncTaskContainer = std::list<AsyncTaskPtr>;
- using AsyncTaskPair = std::pair<AsyncTaskPtr, bool>;
+ using AsyncTaskPair = std::pair<AsyncTaskPtr, RunningTaskState>;
using AsyncRunningTaskContainer = std::list<AsyncTaskPair>;
AsyncTaskContainer mWaitingTasks; ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
using AsyncRunningTaskContainer = std::list<AsyncTaskPair>;
AsyncTaskContainer mWaitingTasks; ///< The queue of the tasks waiting to async process. Must be locked under mWaitingTasksMutex.
std::unique_ptr<CacheImpl> mCacheImpl; ///< Cache interface for AsyncTaskManager.
std::unique_ptr<EventThreadCallback> mTrigger;
std::unique_ptr<CacheImpl> mCacheImpl; ///< Cache interface for AsyncTaskManager.
std::unique_ptr<EventThreadCallback> mTrigger;
- bool mProcessorRegistered;
+
+ bool mProcessorRegistered : 1;
};
} // namespace Adaptor
};
} // namespace Adaptor