task_manager: add RunQueueManager support 90/312790/5
authorInki Dae <inki.dae@samsung.com>
Fri, 14 Jun 2024 03:11:28 +0000 (12:11 +0900)
committerInki Dae <inki.dae@samsung.com>
Wed, 19 Jun 2024 00:17:05 +0000 (09:17 +0900)
Add RunQueueManager class which schedules nodes of graph in runtime.

With this patch, we can drop some node dependency from task manager
by making each concrete node to update next nodes which will be
scheduled by task manager.

Change-Id: I7c448aa5ae579b2f61ec2e5b765789f2e82966ad
Signed-off-by: Inki Dae <inki.dae@samsung.com>
13 files changed:
services/auto_zoom/src/AutoZoom.cpp
services/common/include/AsyncManager.h
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/EndpointNode.h
services/task_manager/include/INode.h
services/task_manager/include/InferenceNode.h
services/task_manager/include/RunQueueManager.h [new file with mode: 0644]
services/task_manager/include/TaskManager.h
services/task_manager/include/TaskNode.h
services/task_manager/include/TrainingNode.h
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp

index b0321c824a2da4fdcf76148379eba1818a0b2e25..ce35e4010d2f4bd1209b73053610b65e73450b72 100644 (file)
@@ -144,8 +144,10 @@ void AutoZoom::configure(InputConfigBase &config)
 
 AutoZoom::~AutoZoom()
 {
-       if (_async_mode)
+       if (_async_mode) {
                _input_service->streamOff();
+               _async_manager->clear();
+       }
 
        _taskManager->clear();
 }
index a03a0731608a0cfaeb7b12117c3702e76815f86f..e45d06650f3db93c02f61868702a3cc295351ec2 100644 (file)
@@ -118,7 +118,9 @@ public:
                _thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
        }
 
-       ~AsyncManager()
+       ~AsyncManager() = default;
+
+       void clear()
        {
                _exit_thread = true;
                _thread_handle->join();
index 078762a973059b5ceb68829b47caee65bfd7e44f..5c8fc2f92c70ff82ea67f8846eb010a7f3e9562f 100644 (file)
@@ -57,14 +57,26 @@ public:
                _inputBuffer->release();
                _enabled = false;
 
-               // Bridge node got the result from previous task node so enable this bridge node.
-               if (_outputBuffer)
-                       _enabled = true;
+               if (!_outputBuffer)
+                       return;
+
+               _enabled = true;
        }
 
-       bool isEnabled()
+       void updateRunQueue(RunQueueManager *runQueueManager) final
        {
-               return _enabled;
+               // If this bridge node failed then wakeup all next nodes because
+               // other nodes can wait for them to finish. In this case,
+               // the next nodes aren't invoked.
+               if (!_enabled) {
+                       for (const auto &n : _nexts)
+                               n->wakeup();
+
+                       return;
+               }
+
+               for (const auto &n : _nexts)
+                       runQueueManager->pushNode(n);
        }
 };
 
index e98333db768c3a7494f8240283fabb2d197bf019..e0642112f0a0d44d4e1ba1d744cd42ee915569bd 100644 (file)
@@ -21,6 +21,7 @@
 #include <condition_variable>
 
 #include "INode.h"
+#include "RunQueueManager.h"
 #include "SingleoException.h"
 #include "SharedBuffer.h"
 
@@ -50,6 +51,10 @@ public:
        CallbackNode() = default;
        virtual ~CallbackNode() = default;
 
+       virtual void configure() = 0;
+       virtual void invoke() = 0;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
+
        NodeType getType() override;
        std::string &getName() override
        {
@@ -64,8 +69,6 @@ public:
        std::shared_ptr<SharedBuffer> &getOutputBuffer() override;
        void wait() override;
        void wakeup() override;
-       virtual void configure() = 0;
-       virtual void invoke() = 0;
        void clear() override;
        void setCb(const NodeCb &cb);
        std::vector<std::shared_ptr<BaseResultType> > &results() override;
index 771b1e284ccfa359ff5a23514e9b62f8dc9a508f..7e1f6d1759188174c062ee063a4cd04691acefc5 100644 (file)
@@ -50,6 +50,9 @@ public:
 
                _inputBuffer->release();
        }
+
+       void updateRunQueue(RunQueueManager *runQueueManager) override
+       {}
 };
 
 }
index e15604d30fedb1daa8c25cc2d2e06b4b1829c9a1..5cc04ce4354b81f0a25a0052354f385313352934 100644 (file)
@@ -30,6 +30,7 @@ namespace singleo
 namespace services
 {
 enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
+class RunQueueManager;
 
 class INode
 {
@@ -52,6 +53,7 @@ public:
        virtual void wait() = 0;
        virtual void wakeup() = 0;
        virtual void clear() = 0;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 using NodeCb = std::function<void(INode *node)>;
index dc6f7793984afe55dfe9974eceede7792ca8ae70..e810f241e15bd49921c8ad8a220eb33f1f471838 100644 (file)
@@ -18,6 +18,7 @@
 #define __INFERENCE_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -47,6 +48,7 @@ public:
        void configure() final;
        void invoke() final;
        std::vector<std::shared_ptr<BaseResultType> > &results() final;
+       void updateRunQueue(RunQueueManager *runQueueManager) override;
 };
 
 }
diff --git a/services/task_manager/include/RunQueueManager.h b/services/task_manager/include/RunQueueManager.h
new file mode 100644 (file)
index 0000000..5f90653
--- /dev/null
@@ -0,0 +1,88 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RUN_QUEUE_MANAGER_H__
+#define __RUN_QUEUE_MANAGER_H__
+
+#include <queue>
+#include <unordered_set>
+#include <memory>
+#include <mutex>
+
+#include "INode.h"
+#include "SingleoLog.h"
+
+namespace singleo
+{
+namespace services
+{
+class RunQueueManager
+{
+private:
+       std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
+       std::queue<std::shared_ptr<INode> > _runQueue;
+       std::mutex _runQueueMutex;
+
+       bool isDuplicated(const std::shared_ptr<INode> &node)
+       {
+               return _uniqueRunNodes.find(node) != _uniqueRunNodes.end();
+       }
+
+public:
+       void pushNode(const std::shared_ptr<INode> node)
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               if (isDuplicated(node))
+                       return;
+
+               _runQueue.push(node);
+               _uniqueRunNodes.insert(node);
+       }
+
+       std::shared_ptr<INode> &popNode()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+               auto &node = _runQueue.front();
+
+               _runQueue.pop();
+
+               return node;
+       }
+
+       bool isEmpty()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               return _runQueue.empty();
+       }
+
+       void clear()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               _uniqueRunNodes.clear();
+       }
+
+public:
+       RunQueueManager() = default;
+       ~RunQueueManager() = default;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
index 16761a0e192723df2b8e1fa57dcdd2bba8f9e221..34f303e2abb438584ecba6ea7bebcb36ef8a6cb1 100644 (file)
@@ -27,6 +27,7 @@
 #include "IInferenceTaskInterface.h"
 #include "SingleoCommonTypes.h"
 #include "INode.h"
+#include "RunQueueManager.h"
 
 namespace singleo
 {
@@ -39,9 +40,8 @@ private:
        std::vector<std::shared_ptr<INode> > _nodes;
        std::vector<std::shared_ptr<BaseResultType> > _results;
        std::queue<std::shared_ptr<std::thread> > _threads;
-       std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
-
        std::mutex _thread_mutex;
+       std::unique_ptr<RunQueueManager> _runQueueManager;
 
        void threadCb(std::shared_ptr<INode> &node);
        void verifyGraph();
index c32fa21430e01075b7e2a5b4521485e808bf3e34..65218290a1f08bb1d4be06856c33f0d6e7af2d96 100644 (file)
@@ -18,6 +18,7 @@
 #define __TASK_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "INode.h"
@@ -66,6 +67,7 @@ public:
        virtual void invoke() = 0;
        void clear() override;
        virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 }
index 16ddaa06eb6d3820231eba9dfdc253add0aa19bb..e5d2ab382df97c96362459fe2e4759eb9ff35388 100644 (file)
@@ -18,6 +18,7 @@
 #define __TRAINING_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -55,6 +56,9 @@ public:
        {
                // TODO. implement results here.
        }
+
+       void updateRunQueue(RunQueueManager *runQueueManager) override
+       {}
 };
 
 }
index a04176ba08bc8c450b44cb3535494dcbe5287e21..ef0c6bdfac354b2cf3a222ba656ebca085d7101f 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "SingleoLog.h"
 #include "InferenceNode.h"
+#include "RunQueueManager.h"
 
 using namespace std;
 using namespace singleo::inference;
@@ -58,5 +59,11 @@ std::vector<std::shared_ptr<BaseResultType> > &InferenceNode::results()
        return _results;
 }
 
+void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
+{
+       for (auto &n : _nexts)
+               runQueueManager->pushNode(n);
+}
+
 }
 }
\ No newline at end of file
index 73b1d6a07a16e3339bb7b5a5210cbbc7d138325c..ed9e29b76574395e8788b496ee3a30f1d4efbfe4 100644 (file)
@@ -39,8 +39,6 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        for (auto &d : node->getDependencies())
                d->wait();
 
-       SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
-
        if (node->getType() == NodeType::INFERENCE) {
                if (_inputs[0]->_data_type != DataType::IMAGE) {
                        SINGLEO_LOGE("Invalid input data type.");
@@ -80,24 +78,15 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        node->invoke();
        node->wakeup();
 
-       // Spawn threads for next nodes
-       for (auto &n : node->getNexts()) {
-               if (node->getType() == NodeType::BRIDGE) {
-                       auto b_node = dynamic_pointer_cast<BridgeNode>(node);
+       node->updateRunQueue(_runQueueManager.get());
 
-                       // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
-                       // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
-                       if (!b_node->isEnabled()) {
-                               n->wakeup();
-                               continue;
-                       }
-               }
+       while (!_runQueueManager->isEmpty()) {
+               auto &n = _runQueueManager->popNode();
 
-               std::lock_guard<std::mutex> lock(_thread_mutex);
-               if (_is_thread_created.find(n) == _is_thread_created.end()) {
-                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-                       _is_thread_created.insert(n);
-               }
+               std::unique_lock<std::mutex> lock(_thread_mutex);
+
+               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+               _thread_mutex.unlock();
        }
 }
 
@@ -247,6 +236,8 @@ void TaskManager::run()
 
        auto inputBuffer = make_shared<SharedBuffer>();
 
+       _runQueueManager = make_unique<RunQueueManager>();
+
        for (auto &i : _inputs)
                inputBuffer->addInput(i);
 
@@ -261,27 +252,29 @@ void TaskManager::run()
                                throw InvalidOperation("root node should be inference node.");
                        }
 
-                       // TODO. consider for multiple sources later.
-                       n->setInputBuffer(inputBuffer);
-                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-                       _is_thread_created.insert(n);
+                       _runQueueManager->pushNode(n);
                }
        }
 
+       while (!_runQueueManager->isEmpty()) {
+               auto &n = _runQueueManager->popNode();
+
+               n->setInputBuffer(inputBuffer);
+               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+       }
+
        while (true) {
                std::unique_lock<std::mutex> lock(_thread_mutex);
                if (_threads.empty())
                        break;
 
                auto t = _threads.front();
-               _threads.pop();
 
+               _threads.pop();
                lock.unlock();
-
                t->join();
        }
 
-       _is_thread_created.clear();
        _inputs.clear();
 
        // the result has been returned to user so clear each node.
@@ -309,7 +302,7 @@ void TaskManager::clear()
        _inputs.clear();
        _nodes.clear();
        _results.clear();
-       _is_thread_created.clear();
+       _runQueueManager->clear();
 }
 
 }