Add runtime pipeline configuration support.
With this patch, we can change inference pipeline in runtime according to
use case, and also bypass a particular node/s in runtime.
The runtime pipeline configuration feature is accomplished by adding
resetPipeline and addEdge member functions to TaskManager, and this
configuration is performed by below three steps.
1. Create node to be used for service scenario by calling requestNewNode()
2. reset current pipeline by calling resetPipeline()
3. Connect nodes with a edge by calling addEdge("node a", "node b")
it means,
next node of "node_a" is "node_b"
"node_b" has "node_a" dependency
After that, we can run the pipeline which is configured with our desired
graph.
As for the the node bypass, it is needed to bypass a particular node/s
in runtime according to use case.
For example, there is service api which uses below a pipeline,
face detection -- bridge -- face landmark -- bridge -- gaze tracker -- endpoint
(latency 1s) (latency 100ms) (latency 30ms)
In this pipeline, performing all nodes of this pipeline needs big latency
because three deep learning models should be performed every time a new
inference is requested. So it will take 1s 130ms for each input.
What is the best if face detection is required only one time?
In this case, the pipeline can be performed within 130ms if we can bypass
face detection node in runtime.
This patch provides a task manager interface to bypass a given node
in runtime, and this allows us to seamlessly integrate these modifications
into the existing workflow without disrupting the overall architecture of
the pipeline and service API implementation.
Change-Id: Ic8540d414560b1cda24cd0f1aee8841d62efb67b
Signed-off-by: Inki Dae <inki.dae@samsung.com>
void configure() final
{
- // To be updated.
+ // TODO.
}
void enableNode(const std::string &name)
void configure() final
{
- // To be updated.
+ // TODO.
}
void invoke() final
{
return _name;
}
+ void setBypass(bool bypass, bool clearResult) { exception::InvalidOperation("Bypass not supported."); }
void setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer) override;
std::shared_ptr<SharedBuffer> &getInputBuffer() override;
void addDependency(INode *node) override;
virtual void configure() = 0;
virtual void invoke() = 0;
void clear() override;
+ void clearDependency() override;
void setCb(const NodeCb &cb);
std::vector<std::shared_ptr<BaseResultType> > &results() override;
};
{
namespace services
{
-enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, BRANCH, ENDPOINT };
+enum class NodeType { NONE, STARTPOINT, INFERENCE, TRAINING, BRIDGE, BRANCH, ENDPOINT };
enum class NodeStatus { NONE, VALID, INVALID };
+
class INode
{
public:
virtual void wait() = 0;
virtual void wakeup() = 0;
virtual void clear() = 0;
+ virtual void clearDependency() = 0;
+ virtual void setBypass(bool bypass, bool clearResult) = 0;
};
using NodeCb = std::function<void(INode *node)>;
--- /dev/null
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STARTPOINT_NODE_H__
+#define __STARTPOINT_NODE_H__
+
+#include "TaskNode.h"
+
+namespace singleo
+{
+namespace services
+{
+class StartpointNode : public TaskNode
+{
+protected:
+ std::vector<std::shared_ptr<BaseResultType> > _results;
+
+public:
+ explicit StartpointNode(const std::string &name)
+ {
+ _name = name;
+ _type = NodeType::STARTPOINT;
+ }
+
+ virtual ~StartpointNode() = default;
+
+ void configure() final
+ {}
+
+ void invoke() final
+ {}
+
+ std::vector<std::shared_ptr<BaseResultType> > &results()
+ {
+ return _results;
+ }
+};
+
+}
+}
+
+#endif
\ No newline at end of file
private:
std::vector<std::shared_ptr<BaseDataType> > _inputs;
std::vector<INode *> _nodes;
+ std::map<std::string, INode *> _nodes_map;
std::vector<std::shared_ptr<BaseResultType> > _results;
std::queue<std::shared_ptr<std::thread> > _threads;
std::unordered_set<INode *> _is_thread_created;
INode *requestNewNode(NodeType type, std::string nodeName);
void addInput(BaseDataType &input);
+ void setBypass(const std::string &node_name, bool bypass = true, bool clearResult = false);
std::vector<std::shared_ptr<BaseDataType> > &getInputs();
void run();
std::vector<std::shared_ptr<BaseResultType> > &output();
void clear();
+ void resetPipeline();
+ void addEdge(const std::string &node_a_name, const std::string &node_b_name);
};
}
bool _completed { false };
std::condition_variable _event;
std::mutex _mutex;
+ bool _bypass { false };
+ bool _clearResult { false };
bool isNodeDuplicated(const std::vector<INode *> &target, const INode *newNode);
void addNext(INode *node) override;
{
return _name;
}
+ void setBypass(bool bypass, bool clearResult) override;
void setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer) override;
std::shared_ptr<SharedBuffer> &getInputBuffer() override;
void addDependency(INode *node) override;
virtual void configure() = 0;
virtual void invoke() = 0;
void clear() override;
+ void clearDependency() override;
virtual std::vector<std::shared_ptr<BaseResultType> > &results() = 0;
};
void configure() final
{
- // TODO. implement configure here.
+ // In default, do not bypass training node.
+ _bypass = false;
+
}
void invoke() final
}
_dependencies.push_back(node);
- node->addNext(this);
}
std::vector<INode *> &CallbackNode::getDependencies()
_event.notify_all();
}
+void CallbackNode::clearDependency()
+{
+ _dependencies.clear();
+ _nexts.clear();
+}
+
void CallbackNode::clear()
{
_inputBuffer = nullptr;
*/
#include "SingleoLog.h"
+#include "SingleoException.h"
#include "InferenceNode.h"
#include "CallbackNode.h"
using namespace std;
using namespace singleo::inference;
+using namespace singleo::exception;
namespace singleo
{
{
_task->configure();
_task->prepare();
+
+ // In default, do not bypass inference node.
+ _bypass = false;
}
void InferenceNode::invoke()
{
+ if (_bypass) {
+ _status = NodeStatus::VALID;
+
+ if (_clearResult)
+ _results.clear();
+
+ return;
+ }
+
_results.clear();
if (_status == NodeStatus::INVALID)
#include "SingleoException.h"
#include "SingleoLog.h"
#include "TaskManager.h"
+#include "StartpointNode.h"
#include "InferenceNode.h"
#include "BridgeNode.h"
#include "BranchNode.h"
try {
switch (type) {
+ case NodeType::STARTPOINT:
+ node = new StartpointNode(nodeName);
+ break;
case NodeType::BRANCH:
node = new BranchNode(nodeName);
break;
return node;
}
+void TaskManager::setBypass(const std::string &node_name, bool bypass, bool clearResult)
+{
+ if (_nodes.empty())
+ return;
+
+ auto &node = _nodes_map[node_name];
+ if (!node) {
+ SINGLEO_LOGE("Node(%s) not found.", node_name.c_str());
+ throw InvalidParameter("Node not found");
+ }
+
+ node->setBypass(bypass, clearResult);
+}
+
void TaskManager::threadCb(INode *node)
{
// Wait until all nodes added to this node as dependency are completed
void TaskManager::addNode(INode *node)
{
- if (isNodeDuplicated(_nodes, node)) {
- SINGLEO_LOGE("A given node has already been registered.");
- throw InvalidOperation("A given node has already been registered.");
- }
+ if (isNodeDuplicated(_nodes, node))
+ return;
- _nodes.push_back(node);
+ _nodes_map[node->getName()] = node;
}
void TaskManager::verifyGraph()
{
- // Start node should be a task node such as InferenceNode and TrainingNode.
- if (_nodes.front()->getType() != NodeType::INFERENCE && _nodes.front()->getType() != NodeType::TRAINING) {
- SINGLEO_LOGE("The first node should be a task node such as InferenceNode or TrainingNode.");
- throw InvalidOperation("The first node should be a task node such as InferenceNode or TrainingNode.");
- }
-
- auto &lastNode = _nodes.back();
-
- // Last node should be EndpointNode.
- if (lastNode->getType() != NodeType::ENDPOINT) {
- SINGLEO_LOGE("The last node should be a EndpointNode.");
- throw InvalidOperation("The last node should be a EndpointNode.");
- }
-
// Check if node names are unique
unordered_set<string> node_names;
for (auto &n : _nodes) {
// Verify graph rule.
for (auto &node : _nodes) {
if (node->getNexts().size() == 0 && node->getType() != NodeType::ENDPOINT) {
- SINGLEO_LOGE("All nodes except endpoint node must have a next node.");
+ SINGLEO_LOGE("All nodes except endpoint node must have a next node.(%s)", node->getName().c_str());
throw InvalidOperation("All nodes except endpoint node must have a next node.");
}
}
if (visitedCount != _nodes.size()) {
- SINGLEO_LOGE("Invalid graph.");
+ SINGLEO_LOGE("Invalid graph.(%zu!= %zu)", visitedCount, _nodes.size());
throw InvalidOperation("Invalid graph.");
}
}
throw InvalidOperation("No input source.");
}
+ // TODO. move below function to other API. It isn't needed to call them every time run is called.
verifyGraph();
auto inputBuffer = make_shared<SharedBuffer>();
inputBuffer->addInput(i);
for (auto &n : _nodes) {
+ // Do not run startpoint node which is just empty node.
+ if (n->getType() == NodeType::STARTPOINT)
+ continue;
+
// Set input as source of current node.
// If no dependency then this node has to receive input source
// from singleo concrete service such as Autozoom, else if dependency
return _results;
}
+void TaskManager::resetPipeline()
+{
+ // Clear node dependency of all nodes first to reconfigure the pipeline.
+ for (auto &node : _nodes)
+ node->clearDependency();
+
+ _nodes.clear();
+}
+
+void TaskManager::addEdge(const string &node_a_name, const string &node_b_name)
+{
+ auto &node_a = _nodes_map[node_a_name];
+ auto &node_b = _nodes_map[node_b_name];
+
+ if (!node_a || !node_b) {
+ SINGLEO_LOGE("Node not found.");
+ throw InvalidParameter("Node not found");
+ }
+
+ if (node_a->getType() != NodeType::STARTPOINT)
+ node_b->addDependency(node_a);
+ node_a->addNext(node_b);
+
+ if (!isNodeDuplicated(_nodes, node_a))
+ _nodes.push_back(node_a);
+
+ if (!isNodeDuplicated(_nodes, node_b))
+ _nodes.push_back(node_b);
+}
+
void TaskManager::clear()
{
_inputs.clear();
- for (auto node : _nodes)
+ for (auto node : _nodes) {
+ node->clear();
+ node->clearDependency();
delete node;
+ }
_nodes.clear();
_results.clear();
_is_thread_created.clear();
_status = status;
}
+void TaskNode::setBypass(bool bypass, bool clearResult)
+{
+ _bypass = bypass;
+ _clearResult = clearResult;
+}
+
void TaskNode::setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer)
{
inputBuffer->addRef();
}
_dependencies.push_back(node);
- node->addNext(this);
}
std::vector<INode *> &TaskNode::getDependencies()
_event.notify_all();
}
+void TaskNode::clearDependency()
+{
+ _dependencies.clear();
+ _nexts.clear();
+}
+
void TaskNode::clear()
{
_inputBuffer = nullptr;