*/
#include <cstring>
+#include <queue>
+#include <map>
+
#include "SingleoException.h"
#include "SingleoLog.h"
#include "TaskManager.h"
auto callbackNode = dynamic_pointer_cast<CallbackNode>(node);
callbackNode->clearResults();
- auto cb = callbackNode->getCb();
- if (node->getType() == NodeType::BRIDGE && !cb) {
- SINGLEO_LOGE("Bridge node needs callback.");
- throw InvalidOperation("Bridge node needs callback.");
- }
for (auto &n : node->getDependencies()) {
if (n->getType() != NodeType::INFERENCE) {
// Call the callback function registered in this callback node.
// In this callback, new data should be set by calling node->setOutput()
// if new data is made. TODO. consider for multiple inputs later.
+ auto cb = callbackNode->getCb();
if (cb)
cb(node);
}
}
+void TaskManager::verifyGraph()
+{
+ map<shared_ptr<INode>, unsigned int> degreeMap;
+
+ // Start node should be a task node such as InferenceNode and TrainingNode.
+ if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
+ SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
+ throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
+ }
+
+ // Last node should be EndpointNode.
+ if (_nodes.back()->getType() != NodeType::ENDPOINT) {
+ SINGLEO_LOGE("The last node should be a EndpointNode.");
+ throw InvalidOperation("The last node should be a EndpointNode.");
+ }
+
+ // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
+ for (auto &node : _nodes) {
+ // BridgeNode should have a callback but it's optional for other types of callback nodes.
+ if (node->getType() == NodeType::BRIDGE) {
+ if (!dynamic_pointer_cast<BridgeNode>(node)->getCb()) {
+ SINGLEO_LOGE("The bridge node should have a callback.");
+ throw InvalidOperation("The bridge node should have a callback.");
+ }
+
+ // Verify if BridgeNode is located bewteen two task nodes such as InferenceNode and TrainingNode.
+ for (auto &d_node : node->getDependencies()) {
+ degreeMap[d_node]++;
+
+ if (d_node->getType() != NodeType::INFERENCE && d_node->getType() != NodeType::TRAINING) {
+ SINGLEO_LOGE("The bridge node should be located between two task nodes such as InferenceNode or TrainingNode.");
+ throw InvalidOperation("The bridge node should be located between two task nodes such as InferenceNode or TrainingNode.");
+ }
+ }
+ } else if (node->getType() == NodeType::INFERENCE || node->getType() == NodeType::TRAINING) {
+ // Verify if InferenceNode or TrainingNode is located prior to BridgeNode.
+ for (auto &d_node : node->getDependencies()) {
+ degreeMap[d_node]++;
+
+ if (d_node->getType() != NodeType::BRIDGE) {
+ SINGLEO_LOGE("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+ throw InvalidOperation("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+ }
+ }
+ }
+ }
+
+ queue<shared_ptr<INode> > zeroDegreeQ;
+
+ for (auto &node : _nodes)
+ if (degreeMap[node] == 0)
+ zeroDegreeQ.push(node);
+
+ size_t visitedCount = 0;
+
+ while (!zeroDegreeQ.empty()) {
+ auto node = zeroDegreeQ.front();
+
+ zeroDegreeQ.pop();
+ visitedCount++;
+
+ for (auto &d_node : node->getDependencies()) {
+ if (--degreeMap[d_node] == 0)
+ zeroDegreeQ.push(d_node);
+ }
+ }
+
+ if (visitedCount != _nodes.size()) {
+ SINGLEO_LOGE("Invalid graph.");
+ throw InvalidOperation("Invalid graph.");
+ }
+}
+
void TaskManager::run()
{
if (_inputs.empty()) {
throw InvalidOperation("No input source.");
}
- // TODO. verify if current graph is valid or not here like below,
- // 1. This graph should be DAG.
- // 2. Start node should be Job node such as InferenceNode and TrainingNode.
- // 3. Last node should be EndpointNode.
- // 4. BridgeNode should be placed bewteen two Job nodes such as InferenceNode and TrainingNode.
- // 5. BridgeNode should have a valid callback but it's optional for other types of callback nodes.
- // 6. Multiple endpoint nodes can exist.
+ verifyGraph();
std::vector<std::unique_ptr<std::thread> > threads;