From: Inki Dae Date: Thu, 27 Jun 2024 23:45:32 +0000 (+0900) Subject: Revert "task_manager: add RunQueueManager support" X-Git-Tag: accepted/tizen/unified/20240903.110722~17 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=f66cc2824e04d93b7f527e16ba4b7eef9dddda59;p=platform%2Fcore%2Fapi%2Fsingleo.git Revert "task_manager: add RunQueueManager support" This reverts commit 45564ae69a0dbc36ee9ee55e352e859fe83d653c. RunQueueManager made the BFS behavior for visiting all nodes in the graph to be broken. So critical problem happended. Revert it. To hide node actions into node classes, we have to find another idea. Change-Id: If65b5cc8cedef483a89d40198975a77322ccfe29 --- diff --git a/services/auto_zoom/src/AutoZoom.cpp b/services/auto_zoom/src/AutoZoom.cpp index 466ef45..bf9a2f6 100644 --- a/services/auto_zoom/src/AutoZoom.cpp +++ b/services/auto_zoom/src/AutoZoom.cpp @@ -145,10 +145,8 @@ void AutoZoom::configure(InputConfigBase &config) AutoZoom::~AutoZoom() { - if (_async_mode) { + if (_async_mode) _input_service->streamOff(); - _async_manager->clear(); - } _taskManager->clear(); } diff --git a/services/common/include/AsyncManager.h b/services/common/include/AsyncManager.h index 6ff9361..ee32192 100644 --- a/services/common/include/AsyncManager.h +++ b/services/common/include/AsyncManager.h @@ -117,9 +117,7 @@ public: _thread_handle = std::make_unique(&AsyncManager::invokeThread, this); } - ~AsyncManager() = default; - - void clear() + ~AsyncManager() { _exit_thread = true; _thread_handle->join(); diff --git a/services/task_manager/include/BridgeNode.h b/services/task_manager/include/BridgeNode.h index 5c8fc2f..078762a 100644 --- a/services/task_manager/include/BridgeNode.h +++ b/services/task_manager/include/BridgeNode.h @@ -57,26 +57,14 @@ public: _inputBuffer->release(); _enabled = false; - if (!_outputBuffer) - return; - - _enabled = true; + // Bridge node got the result from previous task node so enable this bridge node. + if (_outputBuffer) + _enabled = true; } - void updateRunQueue(RunQueueManager *runQueueManager) final + bool isEnabled() { - // If this bridge node failed then wakeup all next nodes because - // other nodes can wait for them to finish. In this case, - // the next nodes aren't invoked. - if (!_enabled) { - for (const auto &n : _nexts) - n->wakeup(); - - return; - } - - for (const auto &n : _nexts) - runQueueManager->pushNode(n); + return _enabled; } }; diff --git a/services/task_manager/include/CallbackNode.h b/services/task_manager/include/CallbackNode.h index e064211..e98333d 100644 --- a/services/task_manager/include/CallbackNode.h +++ b/services/task_manager/include/CallbackNode.h @@ -21,7 +21,6 @@ #include #include "INode.h" -#include "RunQueueManager.h" #include "SingleoException.h" #include "SharedBuffer.h" @@ -51,10 +50,6 @@ public: CallbackNode() = default; virtual ~CallbackNode() = default; - virtual void configure() = 0; - virtual void invoke() = 0; - virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0; - NodeType getType() override; std::string &getName() override { @@ -69,6 +64,8 @@ public: std::shared_ptr &getOutputBuffer() override; void wait() override; void wakeup() override; + virtual void configure() = 0; + virtual void invoke() = 0; void clear() override; void setCb(const NodeCb &cb); std::vector > &results() override; diff --git a/services/task_manager/include/EndpointNode.h b/services/task_manager/include/EndpointNode.h index 7e1f6d1..771b1e2 100644 --- a/services/task_manager/include/EndpointNode.h +++ b/services/task_manager/include/EndpointNode.h @@ -50,9 +50,6 @@ public: _inputBuffer->release(); } - - void updateRunQueue(RunQueueManager *runQueueManager) override - {} }; } diff --git a/services/task_manager/include/INode.h b/services/task_manager/include/INode.h index 5cc04ce..e15604d 100644 --- a/services/task_manager/include/INode.h +++ b/services/task_manager/include/INode.h @@ -30,7 +30,6 @@ namespace singleo namespace services { enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT }; -class RunQueueManager; class INode { @@ -53,7 +52,6 @@ public: virtual void wait() = 0; virtual void wakeup() = 0; virtual void clear() = 0; - virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0; }; using NodeCb = std::function; diff --git a/services/task_manager/include/InferenceNode.h b/services/task_manager/include/InferenceNode.h index e810f24..dc6f779 100644 --- a/services/task_manager/include/InferenceNode.h +++ b/services/task_manager/include/InferenceNode.h @@ -18,7 +18,6 @@ #define __INFERENCE_NODE_H__ #include -#include #include #include "TaskNode.h" @@ -48,7 +47,6 @@ public: void configure() final; void invoke() final; std::vector > &results() final; - void updateRunQueue(RunQueueManager *runQueueManager) override; }; } diff --git a/services/task_manager/include/RunQueueManager.h b/services/task_manager/include/RunQueueManager.h deleted file mode 100644 index 5f90653..0000000 --- a/services/task_manager/include/RunQueueManager.h +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __RUN_QUEUE_MANAGER_H__ -#define __RUN_QUEUE_MANAGER_H__ - -#include -#include -#include -#include - -#include "INode.h" -#include "SingleoLog.h" - -namespace singleo -{ -namespace services -{ -class RunQueueManager -{ -private: - std::unordered_set > _uniqueRunNodes; - std::queue > _runQueue; - std::mutex _runQueueMutex; - - bool isDuplicated(const std::shared_ptr &node) - { - return _uniqueRunNodes.find(node) != _uniqueRunNodes.end(); - } - -public: - void pushNode(const std::shared_ptr node) - { - std::lock_guard lock(_runQueueMutex); - - if (isDuplicated(node)) - return; - - _runQueue.push(node); - _uniqueRunNodes.insert(node); - } - - std::shared_ptr &popNode() - { - std::lock_guard lock(_runQueueMutex); - auto &node = _runQueue.front(); - - _runQueue.pop(); - - return node; - } - - bool isEmpty() - { - std::lock_guard lock(_runQueueMutex); - - return _runQueue.empty(); - } - - void clear() - { - std::lock_guard lock(_runQueueMutex); - - _uniqueRunNodes.clear(); - } - -public: - RunQueueManager() = default; - ~RunQueueManager() = default; -}; - -} -} - -#endif \ No newline at end of file diff --git a/services/task_manager/include/TaskManager.h b/services/task_manager/include/TaskManager.h index 34f303e..16761a0 100644 --- a/services/task_manager/include/TaskManager.h +++ b/services/task_manager/include/TaskManager.h @@ -27,7 +27,6 @@ #include "IInferenceTaskInterface.h" #include "SingleoCommonTypes.h" #include "INode.h" -#include "RunQueueManager.h" namespace singleo { @@ -40,8 +39,9 @@ private: std::vector > _nodes; std::vector > _results; std::queue > _threads; + std::unordered_set > _is_thread_created; + std::mutex _thread_mutex; - std::unique_ptr _runQueueManager; void threadCb(std::shared_ptr &node); void verifyGraph(); diff --git a/services/task_manager/include/TaskNode.h b/services/task_manager/include/TaskNode.h index 6521829..c32fa21 100644 --- a/services/task_manager/include/TaskNode.h +++ b/services/task_manager/include/TaskNode.h @@ -18,7 +18,6 @@ #define __TASK_NODE_H__ #include -#include #include #include "INode.h" @@ -67,7 +66,6 @@ public: virtual void invoke() = 0; void clear() override; virtual std::vector > &results() = 0; - virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0; }; } diff --git a/services/task_manager/include/TrainingNode.h b/services/task_manager/include/TrainingNode.h index e5d2ab3..16ddaa0 100644 --- a/services/task_manager/include/TrainingNode.h +++ b/services/task_manager/include/TrainingNode.h @@ -18,7 +18,6 @@ #define __TRAINING_NODE_H__ #include -#include #include #include "TaskNode.h" @@ -56,9 +55,6 @@ public: { // TODO. implement results here. } - - void updateRunQueue(RunQueueManager *runQueueManager) override - {} }; } diff --git a/services/task_manager/src/InferenceNode.cpp b/services/task_manager/src/InferenceNode.cpp index ef0c6bd..a04176b 100644 --- a/services/task_manager/src/InferenceNode.cpp +++ b/services/task_manager/src/InferenceNode.cpp @@ -16,7 +16,6 @@ #include "SingleoLog.h" #include "InferenceNode.h" -#include "RunQueueManager.h" using namespace std; using namespace singleo::inference; @@ -59,11 +58,5 @@ std::vector > &InferenceNode::results() return _results; } -void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager) -{ - for (auto &n : _nexts) - runQueueManager->pushNode(n); -} - } } \ No newline at end of file diff --git a/services/task_manager/src/TaskManager.cpp b/services/task_manager/src/TaskManager.cpp index ed9e29b..73b1d6a 100644 --- a/services/task_manager/src/TaskManager.cpp +++ b/services/task_manager/src/TaskManager.cpp @@ -39,6 +39,8 @@ void TaskManager::threadCb(shared_ptr &node) for (auto &d : node->getDependencies()) d->wait(); + SINGLEO_LOGD("Launched node name = %s", node->getName().c_str()); + if (node->getType() == NodeType::INFERENCE) { if (_inputs[0]->_data_type != DataType::IMAGE) { SINGLEO_LOGE("Invalid input data type."); @@ -78,15 +80,24 @@ void TaskManager::threadCb(shared_ptr &node) node->invoke(); node->wakeup(); - node->updateRunQueue(_runQueueManager.get()); - - while (!_runQueueManager->isEmpty()) { - auto &n = _runQueueManager->popNode(); + // Spawn threads for next nodes + for (auto &n : node->getNexts()) { + if (node->getType() == NodeType::BRIDGE) { + auto b_node = dynamic_pointer_cast(node); - std::unique_lock lock(_thread_mutex); + // In case of BRIDGE node, if this bridge node didn't get the result from previous task node, + // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node. + if (!b_node->isEnabled()) { + n->wakeup(); + continue; + } + } - _threads.push(make_shared(&TaskManager::threadCb, this, std::ref(n))); - _thread_mutex.unlock(); + std::lock_guard lock(_thread_mutex); + if (_is_thread_created.find(n) == _is_thread_created.end()) { + _threads.push(make_shared(&TaskManager::threadCb, this, std::ref(n))); + _is_thread_created.insert(n); + } } } @@ -236,8 +247,6 @@ void TaskManager::run() auto inputBuffer = make_shared(); - _runQueueManager = make_unique(); - for (auto &i : _inputs) inputBuffer->addInput(i); @@ -252,29 +261,27 @@ void TaskManager::run() throw InvalidOperation("root node should be inference node."); } - _runQueueManager->pushNode(n); + // TODO. consider for multiple sources later. + n->setInputBuffer(inputBuffer); + _threads.push(make_shared(&TaskManager::threadCb, this, std::ref(n))); + _is_thread_created.insert(n); } } - while (!_runQueueManager->isEmpty()) { - auto &n = _runQueueManager->popNode(); - - n->setInputBuffer(inputBuffer); - _threads.push(make_shared(&TaskManager::threadCb, this, std::ref(n))); - } - while (true) { std::unique_lock lock(_thread_mutex); if (_threads.empty()) break; auto t = _threads.front(); - _threads.pop(); + lock.unlock(); + t->join(); } + _is_thread_created.clear(); _inputs.clear(); // the result has been returned to user so clear each node. @@ -302,7 +309,7 @@ void TaskManager::clear() _inputs.clear(); _nodes.clear(); _results.clear(); - _runQueueManager->clear(); + _is_thread_created.clear(); } }