void TaskManager::setBypass(const std::string &node_name, bool bypass, bool clearResult)
{
- if (_nodes.empty())
+ if (_active_nodes.empty())
return;
auto &node = _nodes_map[node_name];
_nodes_map[node->getName()] = node;
}
+void TaskManager::topologicalSorting()
+{
+ queue<INode *> Q;
+
+ INode *startpoint_node {};
+ for (auto &node : _nodes) {
+ if (node->getType() == NodeType::STARTPOINT) {
+ startpoint_node = node;
+ break;
+ }
+ }
+
+ Q.push(startpoint_node);
+
+ INode *endpoint_node {};
+
+ while (!Q.empty()) {
+ auto node = Q.front();
+ Q.pop();
+
+ if (node->getType() != NodeType::ENDPOINT)
+ if (!isNodeDuplicated(_active_nodes, node))
+ _active_nodes.push_back(node);
+
+ if (node->getType() == NodeType::ENDPOINT)
+ endpoint_node = node;
+
+ for (auto &next : node->getNexts())
+ Q.push(next);
+ }
+
+ if (!isNodeDuplicated(_active_nodes, endpoint_node))
+ _active_nodes.push_back(endpoint_node);
+
+ string pipeline_path;
+
+ for (auto &node : _active_nodes)
+ pipeline_path += node->getName() + " -> ";
+ pipeline_path += "end";
+
+ SINGLEO_LOGD("pipeline: %s", pipeline_path.c_str());
+}
+
void TaskManager::verifyGraph()
{
- // Check if node names are unique
+ // Check if node names are unique and startpoint node exists
+ bool startpoint_exists = false;
unordered_set<string> node_names;
for (auto &n : _nodes) {
if (node_names.find(n->getName()) != node_names.end()) {
throw InvalidOperation("Node names must be unique.");
}
node_names.insert(n->getName());
+
+ if (n->getType() == NodeType::STARTPOINT)
+ startpoint_exists = true;
+ }
+
+ if (!startpoint_exists) {
+ SINGLEO_LOGE("Startpoint node does not exist.");
+ throw InvalidOperation("Startpoint node does not exist.");
}
+
node_names.clear();
map<INode *, unsigned int> degreeMap;
throw InvalidOperation("Invalid graph.");
}
- _is_graph_verifed = true;
+ _is_graph_verified = true;
}
void TaskManager::verify()
{
verifyGraph();
+ topologicalSorting();
}
void TaskManager::run()
throw InvalidOperation("No input source.");
}
- if (!_is_graph_verifed) {
+ if (!_is_graph_verified) {
SINGLEO_LOGE("Graph is not verified.");
throw InvalidOperation("Graph is not verified.");
}
for (auto &i : _inputs)
inputBuffer->addInput(i);
- for (auto &n : _nodes) {
+ for (auto &n : _active_nodes) {
// Do not run startpoint node which is just empty node.
if (n->getType() == NodeType::STARTPOINT)
continue;
// the result has been returned to user so clear each node.
// Ps. clear() of each node should be called after graph pipeline is completed.
// because SharedBuffer can be shared between nodes.
- for (auto &node : _nodes)
+ for (auto &node : _active_nodes)
node->clear();
}
vector<shared_ptr<BaseResultType> > &TaskManager::output()
{
- if (_nodes.empty()) {
+ if (_active_nodes.empty()) {
SINGLEO_LOGE("Node is not set");
throw InvalidOperation("Node is not set");
}
_results.clear();
- for (auto &node : _nodes) {
- if (node->getType() == NodeType::ENDPOINT) {
- _results = node->results();
- break;
- }
- }
+ _results = _active_nodes.back()->results();
return _results;
}
node->clearDependency();
_nodes.clear();
+ _active_nodes.clear();
}
void TaskManager::addEdge(const string &node_a_name, const string &node_b_name)
{
- if (_is_graph_verifed) {
+ if (_is_graph_verified) {
SINGLEO_LOGE("Graph has been freezed.");
throw InvalidOperation("Graph has been freezed.");
}
delete node;
}
_nodes.clear();
+ _active_nodes.clear();
_results.clear();
_is_thread_created.clear();
- _is_graph_verifed = false;
+ _is_graph_verified = false;
SharedBuffer::checkMemoryLeak();
}