From: Inki Dae Date: Mon, 2 Dec 2024 02:13:59 +0000 (+0900) Subject: task_manager: reintroduce topologicalSorting() X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=refs%2Fchanges%2F10%2F315410%2F7;p=platform%2Fcore%2Fapi%2Fsingleo.git task_manager: reintroduce topologicalSorting() Reintroduce topologicalSorting(). We have separated verifyGraph() and topologicalSorting() from run() so no overhead and even we can drop unnecessary interation for finding a last node in output function. The reason I introduce topologicalSorting function are, - to show actual pipeline path to user in runtime for debugging - to add multiple pipelines later (as of now, only one _nodes exists but _nodes will be created by user request) Change-Id: Idc389493a8b73d67a9b17ac26516014704501ed2 Signed-off-by: Inki Dae --- diff --git a/services/task_manager/include/TaskManager.h b/services/task_manager/include/TaskManager.h index 95aeefc..606f17f 100644 --- a/services/task_manager/include/TaskManager.h +++ b/services/task_manager/include/TaskManager.h @@ -37,17 +37,19 @@ class TaskManager private: std::vector > _inputs; std::vector _nodes; + std::vector _active_nodes; std::map _nodes_map; std::vector > _results; std::queue > _threads; std::unordered_set _is_thread_created; - bool _is_graph_verifed { false }; + bool _is_graph_verified { false }; std::mutex _thread_mutex; void addNode(INode *node); void threadCb(INode *node); void verifyGraph(); + void topologicalSorting(); bool isNodeDuplicated(const std::vector &target, const INode *newNode); public: diff --git a/services/task_manager/src/TaskManager.cpp b/services/task_manager/src/TaskManager.cpp index a56c290..d7efb6e 100644 --- a/services/task_manager/src/TaskManager.cpp +++ b/services/task_manager/src/TaskManager.cpp @@ -78,7 +78,7 @@ INode *TaskManager::requestNewNode(NodeType type, std::string nodeName) void TaskManager::setBypass(const std::string &node_name, bool bypass, bool clearResult) { - if (_nodes.empty()) + if (_active_nodes.empty()) return; auto &node = _nodes_map[node_name]; @@ -178,9 +178,53 @@ void TaskManager::addNode(INode *node) _nodes_map[node->getName()] = node; } +void TaskManager::topologicalSorting() +{ + queue Q; + + INode *startpoint_node {}; + for (auto &node : _nodes) { + if (node->getType() == NodeType::STARTPOINT) { + startpoint_node = node; + break; + } + } + + Q.push(startpoint_node); + + INode *endpoint_node {}; + + while (!Q.empty()) { + auto node = Q.front(); + Q.pop(); + + if (node->getType() != NodeType::ENDPOINT) + if (!isNodeDuplicated(_active_nodes, node)) + _active_nodes.push_back(node); + + if (node->getType() == NodeType::ENDPOINT) + endpoint_node = node; + + for (auto &next : node->getNexts()) + Q.push(next); + } + + if (!isNodeDuplicated(_active_nodes, endpoint_node)) + _active_nodes.push_back(endpoint_node); + + string pipeline_path; + + for (auto &node : _active_nodes) + pipeline_path += node->getName() + " -> "; + pipeline_path += "end"; + + SINGLEO_LOGD("pipeline: %s", pipeline_path.c_str()); +} + void TaskManager::verifyGraph() { - // Check if node names are unique + // Check if node names are unique and startpoint node exists + bool startpoint_exists = false; unordered_set node_names; for (auto &n : _nodes) { if (node_names.find(n->getName()) != node_names.end()) { @@ -188,7 +232,16 @@ void TaskManager::verifyGraph() throw InvalidOperation("Node names must be unique."); } node_names.insert(n->getName()); + + if (n->getType() == NodeType::STARTPOINT) + startpoint_exists = true; + } + + if (!startpoint_exists) { + SINGLEO_LOGE("Startpoint node does not exist."); + throw InvalidOperation("Startpoint node does not exist."); } + node_names.clear(); map degreeMap; @@ -273,12 +326,13 @@ void TaskManager::verifyGraph() throw InvalidOperation("Invalid graph."); } - _is_graph_verifed = true; + _is_graph_verified = true; } void TaskManager::verify() { verifyGraph(); + topologicalSorting(); } void TaskManager::run() @@ -288,7 +342,7 @@ void TaskManager::run() throw InvalidOperation("No input source."); } - if (!_is_graph_verifed) { + if (!_is_graph_verified) { SINGLEO_LOGE("Graph is not verified."); throw InvalidOperation("Graph is not verified."); } @@ -298,7 +352,7 @@ void TaskManager::run() for (auto &i : _inputs) inputBuffer->addInput(i); - for (auto &n : _nodes) { + for (auto &n : _active_nodes) { // Do not run startpoint node which is just empty node. if (n->getType() == NodeType::STARTPOINT) continue; @@ -342,24 +396,19 @@ void TaskManager::run() // the result has been returned to user so clear each node. // Ps. clear() of each node should be called after graph pipeline is completed. // because SharedBuffer can be shared between nodes. - for (auto &node : _nodes) + for (auto &node : _active_nodes) node->clear(); } vector > &TaskManager::output() { - if (_nodes.empty()) { + if (_active_nodes.empty()) { SINGLEO_LOGE("Node is not set"); throw InvalidOperation("Node is not set"); } _results.clear(); - for (auto &node : _nodes) { - if (node->getType() == NodeType::ENDPOINT) { - _results = node->results(); - break; - } - } + _results = _active_nodes.back()->results(); return _results; } @@ -371,11 +420,12 @@ void TaskManager::resetPipeline() node->clearDependency(); _nodes.clear(); + _active_nodes.clear(); } void TaskManager::addEdge(const string &node_a_name, const string &node_b_name) { - if (_is_graph_verifed) { + if (_is_graph_verified) { SINGLEO_LOGE("Graph has been freezed."); throw InvalidOperation("Graph has been freezed."); } @@ -408,9 +458,10 @@ void TaskManager::clear() delete node; } _nodes.clear(); + _active_nodes.clear(); _results.clear(); _is_thread_created.clear(); - _is_graph_verifed = false; + _is_graph_verified = false; SharedBuffer::checkMemoryLeak(); }