task_manager: add runtime pipeline configuration support 45/319745/10
authorInki Dae <inki.dae@samsung.com>
Thu, 31 Oct 2024 07:20:25 +0000 (16:20 +0900)
committerVibhav Aggarwal <v.aggarwal@samsung.com>
Thu, 28 Nov 2024 06:37:14 +0000 (15:37 +0900)
Add runtime pipeline configuration support.

With this patch, we can change inference pipeline in runtime according to
use case, and also bypass a particular node/s in runtime.

The runtime pipeline configuration feature is accomplished by adding
resetPipeline and addEdge member functions to TaskManager, and this
configuration is performed by below three steps.

1. Create node to be used for service scenario by calling requestNewNode()
2. reset current pipeline by calling resetPipeline()
3. Connect nodes with a edge by calling addEdge("node a", "node b")
   it means,
       next node of "node_a" is "node_b"
       "node_b" has "node_a" dependency

After that, we can run the pipeline which is configured with our desired
graph.

As for the the node bypass, it  is needed to bypass a particular node/s
in runtime according to use case.
For example, there is service api which uses below a pipeline,
face detection -- bridge -- face landmark -- bridge -- gaze tracker -- endpoint
 (latency 1s)              (latency 100ms)            (latency 30ms)

In this pipeline, performing all nodes of this pipeline needs big latency
because three deep learning models should be performed every time a new
inference is requested. So it will take 1s 130ms for each input.
What is the best if face detection is required only one time?

In this case, the pipeline can be performed within 130ms if we can bypass
face detection node in runtime.

This patch provides a task manager interface to bypass a given node
in runtime, and this allows us to seamlessly integrate these modifications
into the existing workflow without disrupting the overall architecture of
the pipeline and service API implementation.

Change-Id: Ic8540d414560b1cda24cd0f1aee8841d62efb67b
Signed-off-by: Inki Dae <inki.dae@samsung.com>
12 files changed:
services/task_manager/include/BranchNode.h
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/INode.h
services/task_manager/include/StartpointNode.h [new file with mode: 0644]
services/task_manager/include/TaskManager.h
services/task_manager/include/TaskNode.h
services/task_manager/include/TrainingNode.h
services/task_manager/src/CallbackNode.cpp
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp
services/task_manager/src/TaskNode.cpp

index 831adb2a8e0f2832016b52d0c1ba7ce6f83ba5a6..fb714eb92c285bd182b20b71563a55c015aca6f8 100644 (file)
@@ -37,7 +37,7 @@ public:
 
        void configure() final
        {
-               // To be updated.
+               // TODO.
        }
 
        void enableNode(const std::string &name)
index bfcb83337213fe323e3a9dd54aa28909f8e77c4b..c1d17eb2a23915d601d70f6dd488664e3288f157 100644 (file)
@@ -36,7 +36,7 @@ public:
 
        void configure() final
        {
-               // To be updated.
+               // TODO.
        }
 
        void invoke() final
index c4bac80f4418d176585934ad8ebee7adeaaf14b7..7389dc2c54e38e833e7d7725178b05dc4078292b 100644 (file)
@@ -58,6 +58,7 @@ public:
        {
                return _name;
        }
+       void setBypass(bool bypass, bool clearResult) { exception::InvalidOperation("Bypass not supported."); }
        void setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer) override;
        std::shared_ptr<SharedBuffer> &getInputBuffer() override;
        void addDependency(INode *node) override;
@@ -70,6 +71,7 @@ public:
        virtual void configure() = 0;
        virtual void invoke() = 0;
        void clear() override;
+       void clearDependency() override;
        void setCb(const NodeCb &cb);
        std::vector<std::shared_ptr<BaseResultType> > &results() override;
 };
index bdcc2a87edb82da7d53769872afc21ae4421d0d9..1bf9aca2c1cb32983ffbabd1eaf1aa19a76c81b3 100644 (file)
@@ -29,9 +29,10 @@ namespace singleo
 {
 namespace services
 {
-enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, BRANCH, ENDPOINT };
+enum class NodeType { NONE, STARTPOINT, INFERENCE, TRAINING, BRIDGE, BRANCH, ENDPOINT };
 
 enum class NodeStatus { NONE, VALID, INVALID };
+
 class INode
 {
 public:
@@ -55,6 +56,8 @@ public:
        virtual void wait() = 0;
        virtual void wakeup() = 0;
        virtual void clear() = 0;
+       virtual void clearDependency() = 0;
+       virtual void setBypass(bool bypass, bool clearResult) = 0;
 };
 
 using NodeCb = std::function<void(INode *node)>;
diff --git a/services/task_manager/include/StartpointNode.h b/services/task_manager/include/StartpointNode.h
new file mode 100644 (file)
index 0000000..c756315
--- /dev/null
@@ -0,0 +1,55 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STARTPOINT_NODE_H__
+#define __STARTPOINT_NODE_H__
+
+#include "TaskNode.h"
+
+namespace singleo
+{
+namespace services
+{
+class StartpointNode : public TaskNode
+{
+protected:
+       std::vector<std::shared_ptr<BaseResultType> > _results;
+
+public:
+       explicit StartpointNode(const std::string &name)
+       {
+               _name = name;
+               _type = NodeType::STARTPOINT;
+       }
+
+       virtual ~StartpointNode() = default;
+
+       void configure() final
+       {}
+
+       void invoke() final
+       {}
+
+       std::vector<std::shared_ptr<BaseResultType> > &results()
+       {
+               return _results;
+       }
+};
+
+}
+}
+
+#endif
\ No newline at end of file
index 4c8002356613c2b5d25637be21d7751d2c950b99..29bbf58f998384da107160040e26934729bc96cc 100644 (file)
@@ -37,6 +37,7 @@ class TaskManager
 private:
        std::vector<std::shared_ptr<BaseDataType> > _inputs;
        std::vector<INode *> _nodes;
+       std::map<std::string, INode *> _nodes_map;
        std::vector<std::shared_ptr<BaseResultType> > _results;
        std::queue<std::shared_ptr<std::thread> > _threads;
        std::unordered_set<INode *> _is_thread_created;
@@ -54,10 +55,13 @@ public:
 
        INode *requestNewNode(NodeType type, std::string nodeName);
        void addInput(BaseDataType &input);
+       void setBypass(const std::string &node_name, bool bypass = true, bool clearResult = false);
        std::vector<std::shared_ptr<BaseDataType> > &getInputs();
        void run();
        std::vector<std::shared_ptr<BaseResultType> > &output();
        void clear();
+       void resetPipeline();
+       void addEdge(const std::string &node_a_name, const std::string &node_b_name);
 };
 
 }
index 8a915d89f3c9643a0a8c11a3d95ef84b47ec0206..5a02563209e3a54e8cdbec22dcec34c933ca2c2a 100644 (file)
@@ -41,6 +41,8 @@ protected:
        bool _completed { false };
        std::condition_variable _event;
        std::mutex _mutex;
+       bool _bypass { false };
+       bool _clearResult { false };
 
        bool isNodeDuplicated(const std::vector<INode *> &target, const INode *newNode);
        void addNext(INode *node) override;
@@ -56,6 +58,7 @@ public:
        {
                return _name;
        }
+       void setBypass(bool bypass, bool clearResult) override;
        void setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer) override;
        std::shared_ptr<SharedBuffer> &getInputBuffer() override;
        void addDependency(INode *node) override;
@@ -68,6 +71,7 @@ public:
        virtual void configure() = 0;
        virtual void invoke() = 0;
        void clear() override;
+       void clearDependency() override;
        virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
 };
 
index edb815e4f52a70910902165c42f91bcbea0a41af..720c2714dc529348d87871b3b7066c907de70d7b 100644 (file)
@@ -44,7 +44,9 @@ public:
 
        void configure() final
        {
-               // TODO. implement configure here.
+               // In default, do not bypass training node.
+               _bypass = false;
+
        }
 
        void invoke() final
index f1ce185868f4ab7c994656b721201c655a3dc939..56b06fad582ef9ea2b0d09defe51840fe4c40df7 100644 (file)
@@ -72,7 +72,6 @@ void CallbackNode::addDependency(INode *node)
        }
 
        _dependencies.push_back(node);
-       node->addNext(this);
 }
 
 std::vector<INode *> &CallbackNode::getDependencies()
@@ -130,6 +129,12 @@ void CallbackNode::wakeup()
        _event.notify_all();
 }
 
+void CallbackNode::clearDependency()
+{
+       _dependencies.clear();
+       _nexts.clear();
+}
+
 void CallbackNode::clear()
 {
        _inputBuffer = nullptr;
index e5dc786b4f287addb414b65ecdb9abf4881fdb90..acedf20e5c8e037d89171ba3402f1b1f758b5a56 100644 (file)
  */
 
 #include "SingleoLog.h"
+#include "SingleoException.h"
 #include "InferenceNode.h"
 #include "CallbackNode.h"
 
 using namespace std;
 using namespace singleo::inference;
+using namespace singleo::exception;
 
 namespace singleo
 {
@@ -36,10 +38,22 @@ void InferenceNode::configure()
 {
        _task->configure();
        _task->prepare();
+
+       // In default, do not bypass inference node.
+       _bypass = false;
 }
 
 void InferenceNode::invoke()
 {
+       if (_bypass) {
+               _status = NodeStatus::VALID;
+
+               if (_clearResult)
+                       _results.clear();
+
+               return;
+       }
+
        _results.clear();
 
        if (_status == NodeStatus::INVALID)
index 963a0bec7eed1e07d575e2290c87b135680a03b5..7039306c97d136e3f804ce18da8c4181ab183f32 100644 (file)
@@ -22,6 +22,7 @@
 #include "SingleoException.h"
 #include "SingleoLog.h"
 #include "TaskManager.h"
+#include "StartpointNode.h"
 #include "InferenceNode.h"
 #include "BridgeNode.h"
 #include "BranchNode.h"
@@ -41,6 +42,9 @@ INode *TaskManager::requestNewNode(NodeType type, std::string nodeName)
 
        try {
                switch (type) {
+               case NodeType::STARTPOINT:
+                       node = new StartpointNode(nodeName);
+                       break;
                case NodeType::BRANCH:
                        node = new BranchNode(nodeName);
                        break;
@@ -72,6 +76,20 @@ INode *TaskManager::requestNewNode(NodeType type, std::string nodeName)
        return node;
 }
 
+void TaskManager::setBypass(const std::string &node_name, bool bypass, bool clearResult)
+{
+       if (_nodes.empty())
+               return;
+
+       auto &node = _nodes_map[node_name];
+       if (!node) {
+               SINGLEO_LOGE("Node(%s) not found.", node_name.c_str());
+               throw InvalidParameter("Node not found");
+       }
+
+       node->setBypass(bypass, clearResult);
+}
+
 void TaskManager::threadCb(INode *node)
 {
        // Wait until all nodes added to this node as dependency are completed
@@ -154,30 +172,14 @@ bool TaskManager::isNodeDuplicated(const vector<INode *> &target, const INode *n
 
 void TaskManager::addNode(INode *node)
 {
-       if (isNodeDuplicated(_nodes, node)) {
-               SINGLEO_LOGE("A given node has already been registered.");
-               throw InvalidOperation("A given node has already been registered.");
-       }
+       if (isNodeDuplicated(_nodes, node))
+               return;
 
-       _nodes.push_back(node);
+       _nodes_map[node->getName()] = node;
 }
 
 void TaskManager::verifyGraph()
 {
-       // Start node should be a task node such as InferenceNode and TrainingNode.
-       if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
-               SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
-               throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
-       }
-
-       auto &lastNode = _nodes.back();
-
-       // Last node should be EndpointNode.
-       if (lastNode->getType() != NodeType::ENDPOINT) {
-               SINGLEO_LOGE("The last node should be a EndpointNode.");
-               throw InvalidOperation("The last node should be a EndpointNode.");
-       }
-
        // Check if node names are unique
        unordered_set<string> node_names;
        for (auto &n : _nodes) {
@@ -194,7 +196,7 @@ void TaskManager::verifyGraph()
        // Verify graph rule.
        for (auto &node : _nodes) {
                if (node->getNexts().size() == 0 && node->getType() != NodeType::ENDPOINT) {
-                       SINGLEO_LOGE("All nodes except endpoint node must have a next node.");
+                       SINGLEO_LOGE("All nodes except endpoint node must have a next node.(%s)", node->getName().c_str());
                        throw InvalidOperation("All nodes except endpoint node must have a next node.");
                }
 
@@ -267,7 +269,7 @@ void TaskManager::verifyGraph()
        }
 
        if (visitedCount != _nodes.size()) {
-               SINGLEO_LOGE("Invalid graph.");
+               SINGLEO_LOGE("Invalid graph.(%zu!= %zu)", visitedCount, _nodes.size());
                throw InvalidOperation("Invalid graph.");
        }
 }
@@ -279,6 +281,7 @@ void TaskManager::run()
                throw InvalidOperation("No input source.");
        }
 
+       // TODO. move below function to other API. It isn't needed to call them every time run is called.
        verifyGraph();
 
        auto inputBuffer = make_shared<SharedBuffer>();
@@ -287,6 +290,10 @@ void TaskManager::run()
                inputBuffer->addInput(i);
 
        for (auto &n : _nodes) {
+               // Do not run startpoint node which is just empty node.
+               if (n->getType() == NodeType::STARTPOINT)
+                       continue;
+
                // Set input as source of current node.
                // If no dependency then this node has to receive input source
                // from singleo concrete service such as Autozoom, else if dependency
@@ -343,11 +350,44 @@ vector<shared_ptr<BaseResultType> > &TaskManager::output()
        return _results;
 }
 
+void TaskManager::resetPipeline()
+{
+       // Clear node dependency of all nodes first to reconfigure the pipeline.
+       for (auto &node : _nodes)
+               node->clearDependency();
+
+       _nodes.clear();
+}
+
+void TaskManager::addEdge(const string &node_a_name, const string &node_b_name)
+{
+       auto &node_a = _nodes_map[node_a_name];
+       auto &node_b = _nodes_map[node_b_name];
+
+       if (!node_a || !node_b) {
+               SINGLEO_LOGE("Node not found.");
+               throw InvalidParameter("Node not found");
+       }
+
+       if (node_a->getType() != NodeType::STARTPOINT)
+               node_b->addDependency(node_a);
+       node_a->addNext(node_b);
+
+       if (!isNodeDuplicated(_nodes, node_a))
+               _nodes.push_back(node_a);
+
+       if (!isNodeDuplicated(_nodes, node_b))
+               _nodes.push_back(node_b);
+}
+
 void TaskManager::clear()
 {
        _inputs.clear();
-       for (auto node : _nodes)
+       for (auto node : _nodes) {
+               node->clear();
+               node->clearDependency();
                delete node;
+       }
        _nodes.clear();
        _results.clear();
        _is_thread_created.clear();
index ad7d8a2bce581fa30a45bdf6e3cd2e85d15558e5..ad3f6c0863f0d933f30c012c517487090c5fe1bc 100644 (file)
@@ -41,6 +41,12 @@ void TaskNode::setStatus(NodeStatus status)
        _status = status;
 }
 
+void TaskNode::setBypass(bool bypass, bool clearResult)
+{
+       _bypass = bypass;
+       _clearResult = clearResult;
+}
+
 void TaskNode::setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer)
 {
        inputBuffer->addRef();
@@ -66,7 +72,6 @@ void TaskNode::addDependency(INode *node)
        }
 
        _dependencies.push_back(node);
-       node->addNext(this);
 }
 
 std::vector<INode *> &TaskNode::getDependencies()
@@ -119,6 +124,12 @@ void TaskNode::wakeup()
        _event.notify_all();
 }
 
+void TaskNode::clearDependency()
+{
+       _dependencies.clear();
+       _nexts.clear();
+}
+
 void TaskNode::clear()
 {
        _inputBuffer = nullptr;