task_manager: update graph verification 74/312074/8
authorInki Dae <inki.dae@samsung.com>
Tue, 4 Jun 2024 04:13:07 +0000 (13:13 +0900)
committerInki Dae <inki.dae@samsung.com>
Tue, 11 Jun 2024 09:25:06 +0000 (18:25 +0900)
Update graph verification by adding _nexts member to check the type of
next node.

Change-Id: Id7fbac1a89d7123c9c008a781c0c3a49a29997e5
Signed-off-by: Inki Dae <inki.dae@samsung.com>
services/task_manager/include/CallbackNode.h
services/task_manager/include/INode.h
services/task_manager/include/TaskNode.h
services/task_manager/src/CallbackNode.cpp
services/task_manager/src/TaskManager.cpp
services/task_manager/src/TaskNode.cpp
test/services/test_task_manager.cpp

index 8834ce724a0dea67bf9d3e99998776fc911720c5..4f2e09b3fad7ca871427561447493cab7ff86940 100644 (file)
@@ -28,12 +28,13 @@ namespace singleo
 {
 namespace services
 {
-class CallbackNode : public INode
+class CallbackNode : public INode, public std::enable_shared_from_this<CallbackNode>
 {
 protected:
        NodeType _type { NodeType::NONE };
        std::string _name;
        std::vector<std::shared_ptr<INode> > _dependencies;
+       std::vector<std::shared_ptr<INode> > _nexts;
        NodeCb _cb;
        std::shared_ptr<SharedBuffer> _inputBuffer;
        std::shared_ptr<SharedBuffer> _outputBuffer;
@@ -43,6 +44,7 @@ protected:
        std::mutex _mutex;
 
        bool isNodeDuplicated(const std::vector<std::shared_ptr<INode> > &target, const std::shared_ptr<INode> &newNode);
+       void addNext(std::shared_ptr<INode> node) override;
 
 public:
        CallbackNode() = default;
@@ -57,6 +59,7 @@ public:
        std::shared_ptr<SharedBuffer> &getInputBuffer() override;
        void addDependency(std::shared_ptr<INode> node) override;
        std::vector<std::shared_ptr<INode> > &getDependencies() override;
+       std::vector<std::shared_ptr<INode> > &getNexts() override;
        void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
index 0e0b037db8552cdcb202f14593d42b611e200792..79b525badac443a936c9fcbeebb9eb781623f0c2 100644 (file)
@@ -41,6 +41,8 @@ public:
        virtual void setInputBuffer(std::shared_ptr<SharedBuffer> inputBuffer) = 0;
        virtual std::shared_ptr<SharedBuffer> &getInputBuffer() = 0;
        virtual void addDependency(std::shared_ptr<INode> node) = 0;
+       virtual void addNext(std::shared_ptr<INode> node) = 0;
+       virtual std::vector<std::shared_ptr<INode> > &getNexts() = 0;
        virtual std::vector<std::shared_ptr<INode> > &getDependencies() = 0;
        virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) = 0;
        virtual std::shared_ptr<SharedBuffer> &getOutputBuffer() = 0;
index 8bc13d6f6fd8370db72d2cbf15848fdaa3e33405..0c5f92aff1b44648607a3e2cc3ccbe2ce58bac8f 100644 (file)
@@ -28,12 +28,13 @@ namespace singleo
 {
 namespace services
 {
-class TaskNode : public INode
+class TaskNode : public INode, public std::enable_shared_from_this<TaskNode>
 {
 protected:
        NodeType _type { NodeType::NONE };
        std::string _name;
        std::vector<std::shared_ptr<INode> > _dependencies;
+       std::vector<std::shared_ptr<INode> > _nexts;
        std::shared_ptr<SharedBuffer> _inputBuffer;
        std::shared_ptr<SharedBuffer> _outputBuffer;
        bool _completed { false };
@@ -41,6 +42,7 @@ protected:
        std::mutex _mutex;
 
        bool isNodeDuplicated(const std::vector<std::shared_ptr<INode> > &target, const std::shared_ptr<INode> &newNode);
+       void addNext(std::shared_ptr<INode> node) override;
 
 public:
        TaskNode() = default;
@@ -55,6 +57,7 @@ public:
        std::shared_ptr<SharedBuffer> &getInputBuffer() override;
        void addDependency(std::shared_ptr<INode> node) override;
        std::vector<std::shared_ptr<INode> > &getDependencies() override;
+       std::vector<std::shared_ptr<INode> > &getNexts() override;
        void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
index 6e8def5fc5e291635594a55f9aa1a7514b2131c1..b18241dc3e2e6cae50ecae0724d44df8caad72d3 100644 (file)
@@ -62,6 +62,7 @@ void CallbackNode::addDependency(std::shared_ptr<INode> node)
        }
 
        _dependencies.push_back(node);
+       node->addNext(shared_ptr<INode>(shared_from_this()));
 }
 
 std::vector<std::shared_ptr<INode> > &CallbackNode::getDependencies()
@@ -69,6 +70,21 @@ std::vector<std::shared_ptr<INode> > &CallbackNode::getDependencies()
        return _dependencies;
 }
 
+void CallbackNode::addNext(shared_ptr<INode> node)
+{
+       if (isNodeDuplicated(_nexts, node)) {
+               SINGLEO_LOGE("A given node has already been registered.");
+               throw InvalidOperation("A given node has already been registered.");
+       }
+
+       _nexts.push_back(node);
+}
+
+std::vector<std::shared_ptr<INode> > &CallbackNode::getNexts()
+{
+       return _nexts;
+}
+
 void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
 {
        outputBuffer->addRef();
index f3558df6c848c9de9ac3ae7f202d968604e25274..e123d06f2c842e7e772a3265c6353c21433c54b5 100644 (file)
@@ -110,24 +110,32 @@ void TaskManager::addNode(std::shared_ptr<INode> node)
 
 void TaskManager::verifyGraph()
 {
-       map<shared_ptr<INode>, unsigned int> degreeMap;
-
        // Start node should be a task node such as InferenceNode and TrainingNode.
        if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
                SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
                throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
        }
 
+       auto &lastNode = _nodes.back();
+
        // Last node should be EndpointNode.
-       if (_nodes.back()->getType() != NodeType::ENDPOINT) {
+       if (lastNode->getType() != NodeType::ENDPOINT) {
                SINGLEO_LOGE("The last node should be a EndpointNode.");
                throw InvalidOperation("The last node should be a EndpointNode.");
        }
 
-       // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
+       map<shared_ptr<INode>, unsigned int> degreeMap;
+
+       // Verify graph rule.
        for (auto &node : _nodes) {
+               if (node->getNexts().size() == 0 && node->getType() != NodeType::ENDPOINT) {
+                       SINGLEO_LOGE("Last node should be endpoint node.");
+                       throw InvalidOperation("Last node should be endpoint node.");
+               }
+
                if (node->getType() == NodeType::BRIDGE) {
                        // Verify if BridgeNode is located bewteen two task nodes such as InferenceNode and TrainingNode.
+                       // ... inference or training node ---- bridge node ---- inference or training node ...
                        for (auto &d_node : node->getDependencies()) {
                                degreeMap[d_node]++;
 
@@ -139,18 +147,34 @@ void TaskManager::verifyGraph()
                                }
                        }
                } else if (node->getType() == NodeType::INFERENCE || node->getType() == NodeType::TRAINING) {
-                       // Verify if InferenceNode or TrainingNode is located prior to BridgeNode.
+                       // Verify if InferenceNode or TrainingNode is located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.
+                       // ... inference or training node ---- bridge node ...
+                       // ... bridge node ---- inference or training node ...
+                       // ... inference or training node ---- endpoint node
                        for (auto &d_node : node->getDependencies()) {
                                degreeMap[d_node]++;
 
                                if (d_node->getType() != NodeType::BRIDGE) {
-                                       SINGLEO_LOGE("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
-                                       throw InvalidOperation("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+                                       SINGLEO_LOGE(
+                                                       "The InferenceNode or TrainingNode should be located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.");
+                                       throw InvalidOperation(
+                                                       "The InferenceNode or TrainingNode should be located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.");
+                               }
+                       }
+               } else if (node->getType() == NodeType::ENDPOINT) {
+                       // ... inference or training node ---- endpoint node
+                       for (auto &d_node : node->getDependencies()) {
+                               degreeMap[d_node]++;
+
+                               if (d_node->getType() != NodeType::INFERENCE && d_node->getType() != NodeType::TRAINING) {
+                                       SINGLEO_LOGE("The EndpointNode should be located behind InferenceNode or TrainingNode.");
+                                       throw InvalidOperation("The EndpointNode should be located behind InferenceNode or TrainingNode.");
                                }
                        }
                }
        }
 
+       // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
        queue<shared_ptr<INode> > zeroDegreeQ;
 
        for (auto &node : _nodes)
index 6ee98bef5de38cb10c1c4f3d69d8dd198210f0c0..2ae2cb57724353404fde3eb3143187ff0b14dd64 100644 (file)
@@ -56,6 +56,7 @@ void TaskNode::addDependency(std::shared_ptr<INode> node)
        }
 
        _dependencies.push_back(node);
+       node->addNext(shared_ptr<INode>(shared_from_this()));
 }
 
 std::vector<std::shared_ptr<INode> > &TaskNode::getDependencies()
@@ -63,6 +64,21 @@ std::vector<std::shared_ptr<INode> > &TaskNode::getDependencies()
        return _dependencies;
 }
 
+void TaskNode::addNext(shared_ptr<INode> node)
+{
+       if (isNodeDuplicated(_nexts, node)) {
+               SINGLEO_LOGE("A given node has already been registered.");
+               throw InvalidOperation("A given node has already been registered.");
+       }
+
+       _nexts.push_back(node);
+}
+
+std::vector<std::shared_ptr<INode> > &TaskNode::getNexts()
+{
+       return _nexts;
+}
+
 void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
 {
        outputBuffer->addRef();
index 48db6bb1a6b76742bb0e8b9f6c7f7cf45cdb6cfc..4d8432c6271000d9218b7340a387d5f1995b3e4d 100644 (file)
@@ -104,7 +104,6 @@ TEST(SingloTaskManager, MultipleNodesBasedGraphAShouldWork)
                taskManager->addNode(face_landmark_node);
 
                auto endpoint_node = make_shared<EndpointNode>();
-               endpoint_node->setCb(nullptr);
                endpoint_node->addDependency(face_landmark_node);
                taskManager->addNode(endpoint_node);
 
@@ -172,7 +171,6 @@ TEST(SingloTaskManager, MultipleNodesBasedGraphBShouldWork)
                taskManager->addNode(face_landmark_node);
 
                auto endpoint_node = make_shared<EndpointNode>();
-               endpoint_node->setCb(nullptr);
                endpoint_node->addDependency(face_landmark_node);
                taskManager->addNode(endpoint_node);