task_manager: verify if _nodes is valid graph or not 19/311519/5
authorInki Dae <inki.dae@samsung.com>
Wed, 22 May 2024 06:05:48 +0000 (15:05 +0900)
committerInki Dae <inki.dae@samsung.com>
Thu, 23 May 2024 04:29:22 +0000 (13:29 +0900)
Verify if _nodes is valid graph or not by checking
- DAG(Direct Acyclic Graph)
- Start node should be a task node such as InferenceNode and TrainingNode.
- Last node should be a EndpointNode.
- BridgeNode should be located bewteen two task nodes such as InferenceNode
  and TrainingNode.
- BridgeNode should have a valid callback.

As for DAG verification, this patch uses BFS(Breadth-first search) algorithm.

Change-Id: I33edf100fa81a335ab094b6cfff512c1d2cd749e
Signed-off-by: Inki Dae <inki.dae@samsung.com>
services/task_manager/include/TaskManager.h
services/task_manager/src/TaskManager.cpp

index 145d923fe7c8167b1097877598041fcc64f911b3..09b3490a6573af0594c3a7e9dfd43053321d58f2 100644 (file)
@@ -37,6 +37,7 @@ private:
        std::vector<std::shared_ptr<BaseResultType> > _results;
 
        void threadCb(std::shared_ptr<INode> &node);
+       void verifyGraph();
 
 public:
        TaskManager() = default;
index 3b47449c6a0327861429d61f5349dd33469c28b2..d110e6c5d238c98754e90120e4d21c7c22f70ff2 100644 (file)
@@ -15,6 +15,9 @@
  */
 
 #include <cstring>
+#include <queue>
+#include <map>
+
 #include "SingleoException.h"
 #include "SingleoLog.h"
 #include "TaskManager.h"
@@ -79,11 +82,6 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        auto callbackNode = dynamic_pointer_cast<CallbackNode>(node);
 
        callbackNode->clearResults();
-       auto cb = callbackNode->getCb();
-       if (node->getType() == NodeType::BRIDGE && !cb) {
-               SINGLEO_LOGE("Bridge node needs callback.");
-               throw InvalidOperation("Bridge node needs callback.");
-       }
 
        for (auto &n : node->getDependencies()) {
                if (n->getType() != NodeType::INFERENCE) {
@@ -116,6 +114,7 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        // Call the callback function registered in this callback node.
        // In this callback, new data should be set by calling node->setOutput()
        // if new data is made. TODO. consider for multiple inputs later.
+       auto cb = callbackNode->getCb();
        if (cb)
                cb(node);
 
@@ -147,6 +146,79 @@ void TaskManager::addNode(std::shared_ptr<INode> node)
        }
 }
 
+void TaskManager::verifyGraph()
+{
+       map<shared_ptr<INode>, unsigned int> degreeMap;
+
+       // Start node should be a task node such as InferenceNode and TrainingNode.
+       if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
+               SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
+               throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
+       }
+
+       // Last node should be EndpointNode.
+       if (_nodes.back()->getType() != NodeType::ENDPOINT) {
+               SINGLEO_LOGE("The last node should be a EndpointNode.");
+               throw InvalidOperation("The last node should be a EndpointNode.");
+       }
+
+       // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
+       for (auto &node : _nodes) {
+               // BridgeNode should have a callback but it's optional for other types of callback nodes.
+               if (node->getType() == NodeType::BRIDGE) {
+                       if (!dynamic_pointer_cast<BridgeNode>(node)->getCb()) {
+                               SINGLEO_LOGE("The bridge node should have a callback.");
+                               throw InvalidOperation("The bridge node should have a callback.");
+                       }
+
+                       // Verify if BridgeNode is located bewteen two task nodes such as InferenceNode and TrainingNode.
+                       for (auto &d_node : node->getDependencies()) {
+                               degreeMap[d_node]++;
+
+                               if (d_node->getType() != NodeType::INFERENCE && d_node->getType() != NodeType::TRAINING) {
+                                       SINGLEO_LOGE("The bridge node should be located between two task nodes such as InferenceNode or TrainingNode.");
+                                       throw InvalidOperation("The bridge node should be located between two task nodes such as InferenceNode or TrainingNode.");
+                               }
+                       }
+               } else if (node->getType() == NodeType::INFERENCE || node->getType() == NodeType::TRAINING) {
+                       // Verify if InferenceNode or TrainingNode is located prior to BridgeNode.
+                       for (auto &d_node : node->getDependencies()) {
+                               degreeMap[d_node]++;
+
+                               if (d_node->getType() != NodeType::BRIDGE) {
+                                       SINGLEO_LOGE("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+                                       throw InvalidOperation("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+                               }
+                       }
+               }
+       }
+
+       queue<shared_ptr<INode> > zeroDegreeQ;
+
+       for (auto &node : _nodes)
+               if (degreeMap[node] == 0)
+                       zeroDegreeQ.push(node);
+
+       size_t visitedCount = 0;
+
+       while (!zeroDegreeQ.empty()) {
+               auto node = zeroDegreeQ.front();
+
+               zeroDegreeQ.pop();
+               visitedCount++;
+
+               for (auto &d_node : node->getDependencies()) {
+                       if (--degreeMap[d_node] == 0)
+                               zeroDegreeQ.push(d_node);
+               }
+       }
+
+       if (visitedCount != _nodes.size()) {
+               SINGLEO_LOGE("Invalid graph.");
+               throw InvalidOperation("Invalid graph.");
+       }
+}
+
 void TaskManager::run()
 {
        if (_inputs.empty()) {
@@ -154,13 +226,7 @@ void TaskManager::run()
                throw InvalidOperation("No input source.");
        }
 
-       // TODO. verify if current graph is valid or not here like below,
-       // 1. This graph should be DAG.
-       // 2. Start node should be Job node such as InferenceNode and TrainingNode.
-       // 3. Last node should be EndpointNode.
-       // 4. BridgeNode should be placed bewteen two Job nodes such as InferenceNode and TrainingNode.
-       // 5. BridgeNode should have a valid callback but it's optional for other types of callback nodes.
-       // 6. Multiple endpoint nodes can exist.
+       verifyGraph();
 
        std::vector<std::unique_ptr<std::thread> > threads;