task_manager: reintroduce topologicalSorting() 10/315410/7
authorInki Dae <inki.dae@samsung.com>
Mon, 2 Dec 2024 02:13:59 +0000 (11:13 +0900)
committerInki Dae <inki.dae@samsung.com>
Wed, 4 Dec 2024 06:26:51 +0000 (06:26 +0000)
Reintroduce topologicalSorting(). We have separated verifyGraph()
and topologicalSorting() from run() so no overhead and even we can
drop unnecessary interation for finding a last node in output function.

The reason I introduce topologicalSorting function are,
- to show actual pipeline path to user in runtime for debugging
- to add multiple pipelines later
  (as of now, only one _nodes exists but _nodes will be created by
   user request)

Change-Id: Idc389493a8b73d67a9b17ac26516014704501ed2
Signed-off-by: Inki Dae <inki.dae@samsung.com>
services/task_manager/include/TaskManager.h
services/task_manager/src/TaskManager.cpp

index 95aeefc1ad0ba72d50f1c39b18a6b04cd3efd1cf..606f17ff230ebf01206eebc9337a1e8b1ae4a007 100644 (file)
@@ -37,17 +37,19 @@ class TaskManager
 private:
        std::vector<std::shared_ptr<BaseDataType> > _inputs;
        std::vector<INode *> _nodes;
+       std::vector<INode *> _active_nodes;
        std::map<std::string, INode *> _nodes_map;
        std::vector<std::shared_ptr<BaseResultType> > _results;
        std::queue<std::shared_ptr<std::thread> > _threads;
        std::unordered_set<INode *> _is_thread_created;
-       bool _is_graph_verifed { false };
+       bool _is_graph_verified { false };
 
        std::mutex _thread_mutex;
 
        void addNode(INode *node);
        void threadCb(INode *node);
        void verifyGraph();
+       void topologicalSorting();
        bool isNodeDuplicated(const std::vector<INode *> &target, const INode *newNode);
 
 public:
index a56c2902132e02be1ef52ff80894257781a11c1f..d7efb6e2e79b1c3202dc92d29b4809ead329caca 100644 (file)
@@ -78,7 +78,7 @@ INode *TaskManager::requestNewNode(NodeType type, std::string nodeName)
 
 void TaskManager::setBypass(const std::string &node_name, bool bypass, bool clearResult)
 {
-       if (_nodes.empty())
+       if (_active_nodes.empty())
                return;
 
        auto &node = _nodes_map[node_name];
@@ -178,9 +178,53 @@ void TaskManager::addNode(INode *node)
        _nodes_map[node->getName()] = node;
 }
 
+void TaskManager::topologicalSorting()
+{
+       queue<INode *> Q;
+
+       INode *startpoint_node {};
+       for (auto &node : _nodes) {
+               if (node->getType() == NodeType::STARTPOINT) {
+                       startpoint_node = node;
+                       break;
+               }
+       }
+
+       Q.push(startpoint_node);
+
+       INode *endpoint_node {};
+
+       while (!Q.empty()) {
+               auto node = Q.front();
+               Q.pop();
+
+               if (node->getType() != NodeType::ENDPOINT)
+                       if (!isNodeDuplicated(_active_nodes, node))
+                               _active_nodes.push_back(node);
+
+               if (node->getType() == NodeType::ENDPOINT)
+                       endpoint_node = node;
+
+               for (auto &next : node->getNexts())
+                       Q.push(next);
+       }
+
+       if (!isNodeDuplicated(_active_nodes, endpoint_node))
+               _active_nodes.push_back(endpoint_node);
+
+       string pipeline_path;
+
+       for (auto &node : _active_nodes)
+               pipeline_path += node->getName() + " -> ";
+       pipeline_path += "end";
+
+       SINGLEO_LOGD("pipeline: %s", pipeline_path.c_str());
+}
+
 void TaskManager::verifyGraph()
 {
-       // Check if node names are unique
+       // Check if node names are unique and startpoint node exists
+       bool startpoint_exists = false;
        unordered_set<string> node_names;
        for (auto &n : _nodes) {
                if (node_names.find(n->getName()) != node_names.end()) {
@@ -188,7 +232,16 @@ void TaskManager::verifyGraph()
                        throw InvalidOperation("Node names must be unique.");
                }
                node_names.insert(n->getName());
+
+               if (n->getType() == NodeType::STARTPOINT)
+                       startpoint_exists = true;
+       }
+
+       if (!startpoint_exists) {
+               SINGLEO_LOGE("Startpoint node does not exist.");
+               throw InvalidOperation("Startpoint node does not exist.");
        }
+
        node_names.clear();
 
        map<INode *, unsigned int> degreeMap;
@@ -273,12 +326,13 @@ void TaskManager::verifyGraph()
                throw InvalidOperation("Invalid graph.");
        }
 
-       _is_graph_verifed = true;
+       _is_graph_verified = true;
 }
 
 void TaskManager::verify()
 {
        verifyGraph();
+       topologicalSorting();
 }
 
 void TaskManager::run()
@@ -288,7 +342,7 @@ void TaskManager::run()
                throw InvalidOperation("No input source.");
        }
 
-       if (!_is_graph_verifed) {
+       if (!_is_graph_verified) {
                SINGLEO_LOGE("Graph is not verified.");
                throw InvalidOperation("Graph is not verified.");
        }
@@ -298,7 +352,7 @@ void TaskManager::run()
        for (auto &i : _inputs)
                inputBuffer->addInput(i);
 
-       for (auto &n : _nodes) {
+       for (auto &n : _active_nodes) {
                // Do not run startpoint node which is just empty node.
                if (n->getType() == NodeType::STARTPOINT)
                        continue;
@@ -342,24 +396,19 @@ void TaskManager::run()
        // the result has been returned to user so clear each node.
        // Ps. clear() of each node should be called after graph pipeline is completed.
        //     because SharedBuffer can be shared between nodes.
-       for (auto &node : _nodes)
+       for (auto &node : _active_nodes)
                node->clear();
 }
 
 vector<shared_ptr<BaseResultType> > &TaskManager::output()
 {
-       if (_nodes.empty()) {
+       if (_active_nodes.empty()) {
                SINGLEO_LOGE("Node is not set");
                throw InvalidOperation("Node is not set");
        }
 
        _results.clear();
-       for (auto &node : _nodes) {
-               if (node->getType() == NodeType::ENDPOINT) {
-                       _results = node->results();
-                       break;
-               }
-       }
+       _results = _active_nodes.back()->results();
 
        return _results;
 }
@@ -371,11 +420,12 @@ void TaskManager::resetPipeline()
                node->clearDependency();
 
        _nodes.clear();
+       _active_nodes.clear();
 }
 
 void TaskManager::addEdge(const string &node_a_name, const string &node_b_name)
 {
-       if (_is_graph_verifed) {
+       if (_is_graph_verified) {
                SINGLEO_LOGE("Graph has been freezed.");
                throw InvalidOperation("Graph has been freezed.");
        }
@@ -408,9 +458,10 @@ void TaskManager::clear()
                delete node;
        }
        _nodes.clear();
+       _active_nodes.clear();
        _results.clear();
        _is_thread_created.clear();
-       _is_graph_verifed = false;
+       _is_graph_verified = false;
 
        SharedBuffer::checkMemoryLeak();
 }