add RunQueueManager task_manager_wip
authorInki Dae <inki.dae@samsung.com>
Fri, 14 Jun 2024 03:11:28 +0000 (12:11 +0900)
committerInki Dae <inki.dae@samsung.com>
Fri, 14 Jun 2024 06:04:33 +0000 (15:04 +0900)
Change-Id: I7c448aa5ae579b2f61ec2e5b765789f2e82966ad
Signed-off-by: Inki Dae <inki.dae@samsung.com>
13 files changed:
services/auto_zoom/src/AutoZoom.cpp
services/common/include/AsyncManager.h
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/EndpointNode.h
services/task_manager/include/INode.h
services/task_manager/include/InferenceNode.h
services/task_manager/include/RunQueueManager.h [new file with mode: 0644]
services/task_manager/include/TaskManager.h
services/task_manager/include/TaskNode.h
services/task_manager/include/TrainingNode.h
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp

index ae5f9c0246ef37e352ff2e42078f4fc612fa67fb..0551744368d9985ce89c6da3ba48c8ad855cdc27 100644 (file)
@@ -144,8 +144,10 @@ void AutoZoom::configure(InputConfigBase &config)
 
 AutoZoom::~AutoZoom()
 {
-       if (_async_mode)
+       if (_async_mode) {
                _input_service->streamOff();
+               _async_manager->clear();
+       }
 
        _taskManager->clear();
 }
index a03a0731608a0cfaeb7b12117c3702e76815f86f..e45d06650f3db93c02f61868702a3cc295351ec2 100644 (file)
@@ -118,7 +118,9 @@ public:
                _thread_handle = std::make_unique<std::thread>(&AsyncManager::invokeThread, this);
        }
 
-       ~AsyncManager()
+       ~AsyncManager() = default;
+
+       void clear()
        {
                _exit_thread = true;
                _thread_handle->join();
index b1f877a0a9765454236b6cd20263975403811466..abb75515ddb1291dd4d21cce1bcb047ff3a8efc5 100644 (file)
@@ -57,14 +57,26 @@ public:
                _inputBuffer->release();
                _enabled = false;
 
-               // Bridge node got the result from previous task node so enable this bridge node.
-               if (_outputBuffer)
-                       _enabled = true;
+               if (!_outputBuffer)
+                       return;
+
+               _enabled = true;
        }
 
-       bool isEnabled()
+       void updateRunQueue(RunQueueManager *runQueueManager) final
        {
-               return _enabled;
+               // If this bridge node failed then wakeup all next nodes because
+               // other nodes can wait for them to finish. In this case,
+               // the next nodes aren't invoked.
+               if (!_enabled) {
+                       for (const auto &n : _nexts)
+                               n->wakeup();
+
+                       return;
+               }
+
+               for (auto &n : _nexts)
+                       runQueueManager->pushNode(n);
        }
 };
 
index e98333db768c3a7494f8240283fabb2d197bf019..ac32a19d2bcbf855ca7cde2fef15785cb3f308b0 100644 (file)
@@ -18,6 +18,7 @@
 #define __CALLBACK_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "INode.h"
@@ -69,6 +70,7 @@ public:
        void clear() override;
        void setCb(const NodeCb &cb);
        std::vector<std::shared_ptr<BaseResultType> > &results() override;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 }
index 3ef78cb1f4e24a2c7fc5794faba4885360714206..82c00570249faba639c48a2614b99420fe989950 100644 (file)
@@ -50,6 +50,9 @@ public:
 
                _inputBuffer->release();
        }
+
+       void updateRunQueue(RunQueueManager *runQueueManager) override
+       {}
 };
 
 }
index e15604d30fedb1daa8c25cc2d2e06b4b1829c9a1..5cc04ce4354b81f0a25a0052354f385313352934 100644 (file)
@@ -30,6 +30,7 @@ namespace singleo
 namespace services
 {
 enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
+class RunQueueManager;
 
 class INode
 {
@@ -52,6 +53,7 @@ public:
        virtual void wait() = 0;
        virtual void wakeup() = 0;
        virtual void clear() = 0;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 using NodeCb = std::function<void(INode *node)>;
index 82fb707b56161db2bfb5a41d7862da368a3aac76..c5761c97e7789a5bd843ae8fe1d28ed49cdf5176 100644 (file)
@@ -18,6 +18,7 @@
 #define __INFERENCE_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -47,6 +48,7 @@ public:
        void configure() final;
        void invoke() final;
        std::vector<std::shared_ptr<BaseResultType> > &results() final;
+       void updateRunQueue(RunQueueManager *runQueueManager) override;
 };
 
 }
diff --git a/services/task_manager/include/RunQueueManager.h b/services/task_manager/include/RunQueueManager.h
new file mode 100644 (file)
index 0000000..260ce8c
--- /dev/null
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RUN_QUEUE_MANAGER_H__
+#define __RUN_QUEUE_MANAGER_H__
+
+#include <queue>
+#include <unordered_set>
+#include <memory>
+#include <mutex>
+
+#include "INode.h"
+#include "SingleoLog.h"
+
+namespace singleo
+{
+namespace services
+{
+class RunQueueManager
+{
+private:
+       std::unordered_set<std::shared_ptr<INode> > _uniqueRunNodes;
+       std::queue<std::shared_ptr<INode> > _runQueue;
+       std::mutex _runQueueMutex;
+
+       bool isDuplicated(std::shared_ptr<INode> &node)
+       {
+               if (_uniqueRunNodes.find(node) != _uniqueRunNodes.end())
+                       return true;
+
+               return false;
+       }
+
+public:
+       void pushNode(std::shared_ptr<INode> &node)
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               if (isDuplicated(node))
+                       return;
+
+               _runQueue.push(node);
+               _uniqueRunNodes.insert(node);
+       }
+
+       std::shared_ptr<INode> &popNode()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+               auto &node = _runQueue.front();
+
+               _runQueue.pop();
+
+               return node;
+       }
+
+       bool isEmpty()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               return _runQueue.empty();
+       }
+
+       void clear()
+       {
+               std::lock_guard<std::mutex> lock(_runQueueMutex);
+
+               _uniqueRunNodes.clear();
+       }
+
+public:
+       RunQueueManager() = default;
+       ~RunQueueManager() = default;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
index 16761a0e192723df2b8e1fa57dcdd2bba8f9e221..34f303e2abb438584ecba6ea7bebcb36ef8a6cb1 100644 (file)
@@ -27,6 +27,7 @@
 #include "IInferenceTaskInterface.h"
 #include "SingleoCommonTypes.h"
 #include "INode.h"
+#include "RunQueueManager.h"
 
 namespace singleo
 {
@@ -39,9 +40,8 @@ private:
        std::vector<std::shared_ptr<INode> > _nodes;
        std::vector<std::shared_ptr<BaseResultType> > _results;
        std::queue<std::shared_ptr<std::thread> > _threads;
-       std::unordered_set<std::shared_ptr<INode> > _is_thread_created;
-
        std::mutex _thread_mutex;
+       std::unique_ptr<RunQueueManager> _runQueueManager;
 
        void threadCb(std::shared_ptr<INode> &node);
        void verifyGraph();
index c32fa21430e01075b7e2a5b4521485e808bf3e34..65218290a1f08bb1d4be06856c33f0d6e7af2d96 100644 (file)
@@ -18,6 +18,7 @@
 #define __TASK_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "INode.h"
@@ -66,6 +67,7 @@ public:
        virtual void invoke() = 0;
        void clear() override;
        virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
+       virtual void updateRunQueue(RunQueueManager *runQueueManager) = 0;
 };
 
 }
index 97ed21c1f03e48fce0b624b4529120b7147896ca..a077b4d4576a91b5a3522686f4ab71520fbf62ba 100644 (file)
@@ -18,6 +18,7 @@
 #define __TRAINING_NODE_H__
 
 #include <mutex>
+#include <queue>
 #include <condition_variable>
 
 #include "TaskNode.h"
@@ -55,6 +56,9 @@ public:
        {
                // TODO. implement results here.
        }
+
+       void updateRunQueue(RunQueueManager *runQueueManager) override
+       {}
 };
 
 }
index a04176ba08bc8c450b44cb3535494dcbe5287e21..ef0c6bdfac354b2cf3a222ba656ebca085d7101f 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "SingleoLog.h"
 #include "InferenceNode.h"
+#include "RunQueueManager.h"
 
 using namespace std;
 using namespace singleo::inference;
@@ -58,5 +59,11 @@ std::vector<std::shared_ptr<BaseResultType> > &InferenceNode::results()
        return _results;
 }
 
+void InferenceNode::updateRunQueue(RunQueueManager *runQueueManager)
+{
+       for (auto &n : _nexts)
+               runQueueManager->pushNode(n);
+}
+
 }
 }
\ No newline at end of file
index a2b5b1f37fc7aee54243e076eefaedb0c9eae15f..4fbf5874816c6a069ba164702b5d0a674f74ee18 100644 (file)
@@ -39,8 +39,6 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        for (auto &d : node->getDependencies())
                d->wait();
 
-       SINGLEO_LOGD("Launched node name = %s", node->getName().c_str());
-
        if (node->getType() == NodeType::INFERENCE) {
                if (_inputs[0]->_data_type != DataType::IMAGE) {
                        SINGLEO_LOGE("Invalid input data type.");
@@ -80,24 +78,15 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
        node->invoke();
        node->wakeup();
 
-       // Spawn threads for next nodes
-       for (auto &n : node->getNexts()) {
-               if (node->getType() == NodeType::BRIDGE) {
-                       auto b_node = dynamic_pointer_cast<BridgeNode>(node);
+       node->updateRunQueue(_runQueueManager.get());
 
-                       // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
-                       // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
-                       if (!b_node->isEnabled()) {
-                               n->wakeup();
-                               continue;
-                       }
-               }
+       while (!_runQueueManager->isEmpty()) {
+               auto &n = _runQueueManager->popNode();
 
-               std::lock_guard<std::mutex> lock(_thread_mutex);
-               if (_is_thread_created.find(n) == _is_thread_created.end()) {
-                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-                       _is_thread_created.insert(n);
-               }
+               std::unique_lock<std::mutex> lock(_thread_mutex);
+
+               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+               _thread_mutex.unlock();
        }
 }
 
@@ -236,6 +225,8 @@ void TaskManager::run()
 
        auto inputBuffer = make_shared<SharedBuffer>();
 
+       _runQueueManager = make_unique<RunQueueManager>();
+
        for (auto &i : _inputs)
                inputBuffer->addInput(i);
 
@@ -250,27 +241,29 @@ void TaskManager::run()
                                throw InvalidOperation("root node should be inference node.");
                        }
 
-                       // TODO. consider for multiple sources later.
-                       n->setInputBuffer(inputBuffer);
-                       _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
-                       _is_thread_created.insert(n);
+                       _runQueueManager->pushNode(n);
                }
        }
 
+       while (!_runQueueManager->isEmpty()) {
+               auto &n = _runQueueManager->popNode();
+
+               n->setInputBuffer(inputBuffer);
+               _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+       }
+
        while (true) {
                std::unique_lock<std::mutex> lock(_thread_mutex);
                if (_threads.empty())
                        break;
 
                auto t = _threads.front();
-               _threads.pop();
 
+               _threads.pop();
                lock.unlock();
-
                t->join();
        }
 
-       _is_thread_created.clear();
        _inputs.clear();
 
        // the result has been returned to user so clear each node.
@@ -298,7 +291,7 @@ void TaskManager::clear()
        _inputs.clear();
        _nodes.clear();
        _results.clear();
-       _is_thread_created.clear();
+       _runQueueManager->clear();
 }
 
 }