Revert "task_manager: add RunQueueManager support" 65/313565/1
authorInki Dae <inki.dae@samsung.com>
Thu, 27 Jun 2024 23:45:32 +0000 (08:45 +0900)
committerInki Dae <inki.dae@samsung.com>
Thu, 27 Jun 2024 23:45:44 +0000 (08:45 +0900)
This reverts commit 45564ae69a0dbc36ee9ee55e352e859fe83d653c.

RunQueueManager made the BFS behavior for visiting all nodes in the graph
to be broken. So critical problem happended. Revert it.

To hide node actions into node classes, we have to find another idea.

Change-Id: If65b5cc8cedef483a89d40198975a77322ccfe29

13 files changed:
services/auto_zoom/src/AutoZoom.cpp
services/common/include/AsyncManager.h
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/EndpointNode.h
services/task_manager/include/INode.h
services/task_manager/include/InferenceNode.h
services/task_manager/include/RunQueueManager.h [deleted file]
services/task_manager/include/TaskManager.h
services/task_manager/include/TaskNode.h
services/task_manager/include/TrainingNode.h
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp

index 466ef4515eb6048e2e1219c0e5bd7c5a1bb886bf..bf9a2f620ca89286f7452b567016c564d6ecef35 100644 (file)
@@ -145,10 +145,8 @@ void AutoZoom::configure(InputConfigBase &config)
 
 AutoZoom::~AutoZoom()
 {
-       if (_async_mode) {
+       if (_async_mode)
                _input_service->streamOff();
-               _async_manager->clear();
-       }
 
        _taskManager->clear();
 }
index 6ff9361f5a44ea7a586929697aeab3e44269215e..ee32192b244fe6dde4827b898cc108c641b09f58 100644 (file)
@@ -117,9 +117,7 @@ public:
                _thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
        }
 
-       ~AsyncManager() = default;
-
-       void clear()
+       ~AsyncManager()
        {
                _exit_thread = true;
                _thread_handle->join();
index 5c8fc2f92c70ff82ea67f8846eb010a7f3e9562f..078762a973059b5ceb68829b47caee65bfd7e44f 100644 (file)
@@ -57,26 +57,14 @@ public:
                _inputBuffer->release();
                _enabled = false;
 
-               if (!_outputBuffer)
-                       return;
-
-               _enabled = true;
+               // Bridge node got the result from previous task node so enable this bridge node.
+               if (_outputBuffer)
+                       _enabled = true;
        }
 
-       void updateRunQueue(RunQueueManager *runQueueManager) final
+       bool isEnabled()
        {
-               // If this bridge node failed then wakeup all next nodes because
-               // other nodes can wait for them to finish. In this case,
-               // the next nodes aren't invoked.
-               if (!_enabled) {
-                       for (const auto &n : _nexts)
-                               n->wakeup();
-
-                       return;
-               }
-
-               for (const auto &n : _nexts)
-                       runQueueManager->pushNode(n);
+               return _enabled;
        }
 };
 
index e0642112f0a0d44d4e1ba1d744cd42ee915569bd..e98333db768c3a7494f8240283fabb2d197bf019 100644 (file)
@@ -21,7 +21,6 @@
 #include <condition_variable>
 
 #include "INode.h"
-#include "RunQueueManager.h"
 #include "SingleoException.h"
 #include "SharedBuffer.h"
 
@@ -51,10 +50,6 @@ public:
        CallbackNode() = default;
        virtual ~CallbackNode() = default;
 
-       virtual void configure() = 0;
-       virtual void invoke() = 0;
-       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
-
        NodeType getType() override;
        std::string &getName() override
        {
@@ -69,6 +64,8 @@ public:
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
        void wakeup() override;
+       virtual void configure() = 0;
+       virtual void invoke() = 0;
        void clear() override;
        void setCb(const NodeCb &cb);
        std::vector<std::shared_ptr<BaseResultType> > &results() override;
index 7e1f6d1759188174c062ee063a4cd04691acefc5..771b1e284ccfa359ff5a23514e9b62f8dc9a508f 100644 (file)
@@ -50,9 +50,6 @@ public:
 
                _inputBuffer->release();
        }
-
-       void updateRunQueue(RunQueueManager *runQueueManager) override
-       {}
 };
 
 }
index 5cc04ce4354b81f0a25a0052354f385313352934..e15604d30fedb1daa8c25cc2d2e06b4b1829c9a1 100644 (file)
@@ -30,7 +30,6 @@ namespace singleo
 namespace services
 {
 enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
-class RunQueueManager;
 
 class INode
 {
@@ -53,7 +52,6 @@ public:
        virtual void wait() = 0;
        virtual void wakeup() = 0;
        virtual void clear() = 0;
-       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 using NodeCb = std::function<void(INode *node)>;
index e810f241e15bd49921c8ad8a220eb33f1f471838..dc6f7793984afe55dfe9974eceede7792ca8ae70 100644 (file)
@@ -18,7 +18,6 @@
 #define __INFERENCE_NODE_H__
 
 #include <mutex>
-#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -48,7 +47,6 @@ public:
        void configure() final;
        void invoke() final;
        std::vector<std::shared_ptr<BaseResultType> > &results() final;
-       void updateRunQueue(RunQueueManager *runQueueManager) override;
 };
 
 }
diff --git a/services/task_manager/include/RunQueueManager.h b/services/task_manager/include/RunQueueManager.h
deleted file mode 100644 (file)
index 5f90653..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __RUN_QUEUE_MANAGER_H__
-#define __RUN_QUEUE_MANAGER_H__
-
-#include <queue>
-#include <unordered_set>
-#include <memory>
-#include <mutex>
-
-#include "INode.h"
-#include "SingleoLog.h"
-
-namespace singleo
-{
-namespace services
-{
-class RunQueueManager
-{
-private:
-       std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
-       std::queue<std::shared_ptr<INode> > _runQueue;
-       std::mutex _runQueueMutex;
-
-       bool isDuplicated(const std::shared_ptr<INode> &node)
-       {
-               return _uniqueRunNodes.find(node) != _uniqueRunNodes.end();
-       }
-
-public:
-       void pushNode(const std::shared_ptr<INode> node)
-       {
-               std::lock_guard<std::mutex> lock(_runQueueMutex);
-
-               if (isDuplicated(node))
-                       return;
-
-               _runQueue.push(node);
-               _uniqueRunNodes.insert(node);
-       }
-
-       std::shared_ptr<INode> &popNode()
-       {
-               std::lock_guard<std::mutex> lock(_runQueueMutex);
-               auto &node = _runQueue.front();
-
-               _runQueue.pop();
-
-               return node;
-       }
-
-       bool isEmpty()
-       {
-               std::lock_guard<std::mutex> lock(_runQueueMutex);
-
-               return _runQueue.empty();
-       }
-
-       void clear()
-       {
-               std::lock_guard<std::mutex> lock(_runQueueMutex);
-
-               _uniqueRunNodes.clear();
-       }
-
-public:
-       RunQueueManager() = default;
-       ~RunQueueManager() = default;
-};
-
-}
-}
-
-#endif
\ No newline at end of file
index 34f303e2abb438584ecba6ea7bebcb36ef8a6cb1..16761a0e192723df2b8e1fa57dcdd2bba8f9e221 100644 (file)
@@ -27,7 +27,6 @@
 #include "IInferenceTaskInterface.h"
 #include "SingleoCommonTypes.h"
 #include "INode.h"
-#include "RunQueueManager.h"
 
 namespace singleo
 {
@@ -40,8 +39,9 @@ private:
        std::vector<std::shared_ptr<INode> > _nodes;
        std::vector<std::shared_ptr<BaseResultType> > _results;
        std::queue<std::shared_ptr<std::thread> > _threads;
+       std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
+
        std::mutex _thread_mutex;
-       std::unique_ptr<RunQueueManager> _runQueueManager;
 
        void threadCb(std::shared_ptr<INode> &node);
        void verifyGraph();
index 65218290a1f08bb1d4be06856c33f0d6e7af2d96..c32fa21430e01075b7e2a5b4521485e808bf3e34 100644 (file)
@@ -18,7 +18,6 @@
 #define __TASK_NODE_H__
 
 #include <mutex>
-#include <queue>
 #include <condition_variable>
 
 #include "INode.h"
@@ -67,7 +66,6 @@ public:
        virtual void invoke() = 0;
        void clear() override;
        virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
-       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 }
index e5d2ab382df97c96362459fe2e4759eb9ff35388..16ddaa06eb6d3820231eba9dfdc253add0aa19bb 100644 (file)
@@ -18,7 +18,6 @@
 #define __TRAINING_NODE_H__
 
 #include <mutex>
-#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -56,9 +55,6 @@ public:
        {
                // TODO. implement results here.
        }
-
-       void updateRunQueue(RunQueueManager *runQueueManager) override
-       {}
 };
 
 }
index ef0c6bdfac354b2cf3a222ba656ebca085d7101f..a04176ba08bc8c450b44cb3535494dcbe5287e21 100644 (file)
@@ -16,7 +16,6 @@
 
 #include "SingleoLog.h"
 #include "InferenceNode.h"
-#include "RunQueueManager.h"
 
 using namespace std;
 using namespace singleo::inference;
@@ -59,11 +58,5 @@ std::vector<std::shared_ptr<BaseResultType> > &InferenceNode::results()
        return _results;
 }
 
-void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
-{
-       for (auto &n : _nexts)
-               runQueueManager->pushNode(n);
-}
-
 }
 }
\ No newline at end of file
index ed9e29b76574395e8788b496ee3a30f1d4efbfe4..73b1d6a07a16e3339bb7b5a5210cbbc7d138325c 100644 (file)
@@ -39,6 +39,8 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        for (auto &d : node->getDependencies())
                d->wait();
 
+       SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
+
        if (node->getType() == NodeType::INFERENCE) {
                if (_inputs[0]->_data_type != DataType::IMAGE) {
                        SINGLEO_LOGE("Invalid input data type.");
@@ -78,15 +80,24 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        node->invoke();
        node->wakeup();
 
-       node->updateRunQueue(_runQueueManager.get());
-
-       while (!_runQueueManager->isEmpty()) {
-               auto &n = _runQueueManager->popNode();
+       // Spawn threads for next nodes
+       for (auto &n : node->getNexts()) {
+               if (node->getType() == NodeType::BRIDGE) {
+                       auto b_node = dynamic_pointer_cast<BridgeNode>(node);
 
-               std::unique_lock<std::mutex> lock(_thread_mutex);
+                       // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
+                       // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
+                       if (!b_node->isEnabled()) {
+                               n->wakeup();
+                               continue;
+                       }
+               }
 
-               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-               _thread_mutex.unlock();
+               std::lock_guard<std::mutex> lock(_thread_mutex);
+               if (_is_thread_created.find(n) == _is_thread_created.end()) {
+                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+                       _is_thread_created.insert(n);
+               }
        }
 }
 
@@ -236,8 +247,6 @@ void TaskManager::run()
 
        auto inputBuffer = make_shared<SharedBuffer>();
 
-       _runQueueManager = make_unique<RunQueueManager>();
-
        for (auto &i : _inputs)
                inputBuffer->addInput(i);
 
@@ -252,29 +261,27 @@ void TaskManager::run()
                                throw InvalidOperation("root node should be inference node.");
                        }
 
-                       _runQueueManager->pushNode(n);
+                       // TODO. consider for multiple sources later.
+                       n->setInputBuffer(inputBuffer);
+                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+                       _is_thread_created.insert(n);
                }
        }
 
-       while (!_runQueueManager->isEmpty()) {
-               auto &n = _runQueueManager->popNode();
-
-               n->setInputBuffer(inputBuffer);
-               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-       }
-
        while (true) {
                std::unique_lock<std::mutex> lock(_thread_mutex);
                if (_threads.empty())
                        break;
 
                auto t = _threads.front();
-
                _threads.pop();
+
                lock.unlock();
+
                t->join();
        }
 
+       _is_thread_created.clear();
        _inputs.clear();
 
        // the result has been returned to user so clear each node.
@@ -302,7 +309,7 @@ void TaskManager::clear()
        _inputs.clear();
        _nodes.clear();
        _results.clear();
-       _runQueueManager->clear();
+       _is_thread_created.clear();
 }
 
 }