task_manager: spawn threads in BFS manner 47/312347/7
authorVibhav Aggarwal <v.aggarwal@samsung.com>
Fri, 7 Jun 2024 03:43:12 +0000 (12:43 +0900)
committerInki Dae <inki.dae@samsung.com>
Wed, 12 Jun 2024 00:57:24 +0000 (09:57 +0900)
[Issue type] code optimization

After this change, threads for each node will be spawned
only when any of its dependency nodes has finished processing.
This change greatly reduces the number of sleeping threads in
a large graph.

Change-Id: I4ce9c8bfcf3d6395f3a5cfabc5c160242faea26a
Signed-off-by: Vibhav Aggarwal <v.aggarwal@samsung.com>
services/task_manager/include/TaskManager.h
services/task_manager/src/TaskManager.cpp

index 92308c2bddac7ad927b3cc9544cc9b95d935857b..16761a0e192723df2b8e1fa57dcdd2bba8f9e221 100644 (file)
 #define __TASK_MANAGER_H__
 
 #include <vector>
+#include <queue>
+#include <unordered_set>
 #include <memory>
 #include <thread>
+#include <mutex>
 
 #include "IInferenceTaskInterface.h"
 #include "SingleoCommonTypes.h"
@@ -35,6 +38,10 @@ private:
        std::vector<std::shared_ptr<BaseDataType> > _inputs;
        std::vector<std::shared_ptr<INode> > _nodes;
        std::vector<std::shared_ptr<BaseResultType> > _results;
+       std::queue<std::shared_ptr<std::thread> > _threads;
+       std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
+
+       std::mutex _thread_mutex;
 
        void threadCb(std::shared_ptr<INode> &node);
        void verifyGraph();
index 1adf72f59c70c2fe6fb6826c599bf8b448aa26e0..bb264677fcca96f441dd7116e84fc13771026d4c 100644 (file)
@@ -79,6 +79,15 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
 
        node->invoke();
        node->wakeup();
+
+       // Spawn threads for next nodes
+       for (auto &n : node->getNexts()) {
+               std::lock_guard<std::mutex> lock(_thread_mutex);
+               if (_is_thread_created.find(n) == _is_thread_created.end()) {
+                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+                       _is_thread_created.insert(n);
+               }
+       }
 }
 
 void TaskManager::addInput(BaseDataType &input)
@@ -214,8 +223,6 @@ void TaskManager::run()
 
        verifyGraph();
 
-       std::vector<std::unique_ptr<std::thread> > threads;
-
        auto inputBuffer = make_shared<SharedBuffer>();
 
        for (auto &i : _inputs)
@@ -234,14 +241,24 @@ void TaskManager::run()
 
                        // TODO. consider for multiple sources later.
                        n->setInputBuffer(inputBuffer);
+                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+                       _is_thread_created.insert(n);
                }
-
-               threads.push_back(make_unique<thread>(&TaskManager::threadCb, this, std::ref(n)));
        }
 
-       for (auto &t : threads)
-               t->join();
+       while (true) {
+               std::unique_lock<std::mutex> lock(_thread_mutex);
+               if (_threads.empty())
+                       break;
 
+               auto t = _threads.front();
+               _threads.pop();
+
+               lock.unlock();
+
+               t->join();
+       }
+       _is_thread_created.clear();
        _inputs.clear();
 }
 
@@ -260,7 +277,10 @@ vector<shared_ptr<BaseResultType> > &TaskManager::output()
 
 void TaskManager::clear()
 {
+       _inputs.clear();
        _nodes.clear();
+       _results.clear();
+       _is_thread_created.clear();
 }
 
 }