{
namespace services
{
-class CallbackNode : public INode
+class CallbackNode : public INode, public std::enable_shared_from_this<CallbackNode>
{
protected:
NodeType _type { NodeType::NONE };
std::string _name;
std::vector<std::shared_ptr<INode> > _dependencies;
+ std::vector<std::shared_ptr<INode> > _nexts;
NodeCb _cb;
std::shared_ptr<SharedBuffer> _inputBuffer;
std::shared_ptr<SharedBuffer> _outputBuffer;
std::mutex _mutex;
bool isNodeDuplicated(const std::vector<std::shared_ptr<INode> > &target, const std::shared_ptr<INode> &newNode);
+ void addNext(std::shared_ptr<INode> node) override;
public:
CallbackNode() = default;
std::shared_ptr<SharedBuffer> &getInputBuffer() override;
void addDependency(std::shared_ptr<INode> node) override;
std::vector<std::shared_ptr<INode> > &getDependencies() override;
+ std::vector<std::shared_ptr<INode> > &getNexts() override;
void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
virtual void setInputBuffer(std::shared_ptr<SharedBuffer> inputBuffer) = 0;
virtual std::shared_ptr<SharedBuffer> &getInputBuffer() = 0;
virtual void addDependency(std::shared_ptr<INode> node) = 0;
+ virtual void addNext(std::shared_ptr<INode> node) = 0;
+ virtual std::vector<std::shared_ptr<INode> > &getNexts() = 0;
virtual std::vector<std::shared_ptr<INode> > &getDependencies() = 0;
virtual void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) = 0;
virtual std::shared_ptr<SharedBuffer> &getOutputBuffer() = 0;
{
namespace services
{
-class TaskNode : public INode
+class TaskNode : public INode, public std::enable_shared_from_this<TaskNode>
{
protected:
NodeType _type { NodeType::NONE };
std::string _name;
std::vector<std::shared_ptr<INode> > _dependencies;
+ std::vector<std::shared_ptr<INode> > _nexts;
std::shared_ptr<SharedBuffer> _inputBuffer;
std::shared_ptr<SharedBuffer> _outputBuffer;
bool _completed { false };
std::mutex _mutex;
bool isNodeDuplicated(const std::vector<std::shared_ptr<INode> > &target, const std::shared_ptr<INode> &newNode);
+ void addNext(std::shared_ptr<INode> node) override;
public:
TaskNode() = default;
std::shared_ptr<SharedBuffer> &getInputBuffer() override;
void addDependency(std::shared_ptr<INode> node) override;
std::vector<std::shared_ptr<INode> > &getDependencies() override;
+ std::vector<std::shared_ptr<INode> > &getNexts() override;
void setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer) override;
std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
void wait() override;
}
_dependencies.push_back(node);
+ node->addNext(shared_ptr<INode>(shared_from_this()));
}
std::vector<std::shared_ptr<INode> > &CallbackNode::getDependencies()
return _dependencies;
}
+void CallbackNode::addNext(shared_ptr<INode> node)
+{
+ if (isNodeDuplicated(_nexts, node)) {
+ SINGLEO_LOGE("A given node has already been registered.");
+ throw InvalidOperation("A given node has already been registered.");
+ }
+
+ _nexts.push_back(node);
+}
+
+std::vector<std::shared_ptr<INode> > &CallbackNode::getNexts()
+{
+ return _nexts;
+}
+
void CallbackNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
{
outputBuffer->addRef();
void TaskManager::verifyGraph()
{
- map<shared_ptr<INode>, unsigned int> degreeMap;
-
// Start node should be a task node such as InferenceNode and TrainingNode.
if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
}
+ auto &lastNode = _nodes.back();
+
// Last node should be EndpointNode.
- if (_nodes.back()->getType() != NodeType::ENDPOINT) {
+ if (lastNode->getType() != NodeType::ENDPOINT) {
SINGLEO_LOGE("The last node should be a EndpointNode.");
throw InvalidOperation("The last node should be a EndpointNode.");
}
- // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
+ map<shared_ptr<INode>, unsigned int> degreeMap;
+
+ // Verify graph rule.
for (auto &node : _nodes) {
+ if (node->getNexts().size() == 0 && node->getType() != NodeType::ENDPOINT) {
+ SINGLEO_LOGE("Last node should be endpoint node.");
+ throw InvalidOperation("Last node should be endpoint node.");
+ }
+
if (node->getType() == NodeType::BRIDGE) {
// Verify if BridgeNode is located bewteen two task nodes such as InferenceNode and TrainingNode.
+ // ... inference or training node ---- bridge node ---- inference or training node ...
for (auto &d_node : node->getDependencies()) {
degreeMap[d_node]++;
}
}
} else if (node->getType() == NodeType::INFERENCE || node->getType() == NodeType::TRAINING) {
- // Verify if InferenceNode or TrainingNode is located prior to BridgeNode.
+ // Verify if InferenceNode or TrainingNode is located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.
+ // ... inference or training node ---- bridge node ...
+ // ... bridge node ---- inference or training node ...
+ // ... inference or training node ---- endpoint node
for (auto &d_node : node->getDependencies()) {
degreeMap[d_node]++;
if (d_node->getType() != NodeType::BRIDGE) {
- SINGLEO_LOGE("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
- throw InvalidOperation("The InferenceNode or TrainingNode should be located prior to BridgeNode.");
+ SINGLEO_LOGE(
+ "The InferenceNode or TrainingNode should be located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.");
+ throw InvalidOperation(
+ "The InferenceNode or TrainingNode should be located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.");
+ }
+ }
+ } else if (node->getType() == NodeType::ENDPOINT) {
+ // ... inference or training node ---- endpoint node
+ for (auto &d_node : node->getDependencies()) {
+ degreeMap[d_node]++;
+
+ if (d_node->getType() != NodeType::INFERENCE && d_node->getType() != NodeType::TRAINING) {
+ SINGLEO_LOGE("The EndpointNode should be located behind InferenceNode or TrainingNode.");
+ throw InvalidOperation("The EndpointNode should be located behind InferenceNode or TrainingNode.");
}
}
}
}
+ // Verify if _nodes is DAG(Directed Acyclic Graph) or not.
queue<shared_ptr<INode> > zeroDegreeQ;
for (auto &node : _nodes)
}
_dependencies.push_back(node);
+ node->addNext(shared_ptr<INode>(shared_from_this()));
}
std::vector<std::shared_ptr<INode> > &TaskNode::getDependencies()
return _dependencies;
}
+void TaskNode::addNext(shared_ptr<INode> node)
+{
+ if (isNodeDuplicated(_nexts, node)) {
+ SINGLEO_LOGE("A given node has already been registered.");
+ throw InvalidOperation("A given node has already been registered.");
+ }
+
+ _nexts.push_back(node);
+}
+
+std::vector<std::shared_ptr<INode> > &TaskNode::getNexts()
+{
+ return _nexts;
+}
+
void TaskNode::setOutputBuffer(std::shared_ptr<SharedBuffer> outputBuffer)
{
outputBuffer->addRef();
taskManager->addNode(face_landmark_node);
auto endpoint_node = make_shared<EndpointNode>();
- endpoint_node->setCb(nullptr);
endpoint_node->addDependency(face_landmark_node);
taskManager->addNode(endpoint_node);
taskManager->addNode(face_landmark_node);
auto endpoint_node = make_shared<EndpointNode>();
- endpoint_node->setCb(nullptr);
endpoint_node->addDependency(face_landmark_node);
taskManager->addNode(endpoint_node);