task_manager: introduce _status member in each node 39/313639/2
authorVibhav Aggarwal <v.aggarwal@samsung.com>
Fri, 28 Jun 2024 10:53:00 +0000 (19:53 +0900)
committerVibhav Aggarwal <v.aggarwal@samsung.com>
Fri, 28 Jun 2024 11:26:07 +0000 (20:26 +0900)
[Issue type] bug fix

The _status of a node can be NONE, VALID or INVALID.
If all dependencies of a node become INVALID, then current node
will also become INVALID and will not be invoked.

This patch also introduces _is_empty member in the BaseResultType
struct. If some task fails, this member is set to true so that
the corresponding task node can be set to INVALID.

Change-Id: I95a0068ae2ce4ca8e2f2fbf69696b7fe6683863d
Signed-off-by: Vibhav Aggarwal <v.aggarwal@samsung.com>
15 files changed:
common/include/SingleoCommonTypes.h
inference/backends/mediavision/src/MvFaceDetection.cpp
inference/backends/mediavision/src/MvFaceLandmark.cpp
inference/backends/mediavision/src/MvFaceRecognition.cpp
inference/backends/mediavision/src/MvImageClassification.cpp
inference/backends/mediavision/src/MvObjectDetection.cpp
services/task_manager/include/BridgeNode.h
services/task_manager/include/CallbackNode.h
services/task_manager/include/EndpointNode.h
services/task_manager/include/INode.h
services/task_manager/include/TaskNode.h
services/task_manager/src/CallbackNode.cpp
services/task_manager/src/InferenceNode.cpp
services/task_manager/src/TaskManager.cpp
services/task_manager/src/TaskNode.cpp

index 962131aee9df92a84b419166ce9b2a04fef05c8f..b14cacf5a32a238dc71b0c94569db828498d8ecc 100644 (file)
@@ -133,6 +133,7 @@ enum class ResultType { NONE, OBJECT_DETECTION, FACE_DETECTION, FACE_LANDMARK, I
 
 struct BaseResultType {
        ResultType _type { ResultType::NONE };
+       bool _is_empty { true };
        unsigned int _frame_number {};
        BaseResultType(ResultType type) : _type(type)
        {}
index 56f798c10c614995939e2c5a8424dd485ddec258..dfec6ebe76fa5c8766c7e036e84907963362c416 100644 (file)
@@ -94,6 +94,7 @@ BaseResultType &MvFaceDetection::result()
        if (ret != MEDIA_VISION_ERROR_NONE)
                throw runtime_error("Fail to get face detection result count.");
 
+       _output_data._is_empty = result_cnt == 0;
        _output_data._rects.clear();
        _output_data._frame_number = frame_number;
 
index a624c8d0ad1e3076474b3b3621087f00bc2d8d7d..5abf0fdcdfec18153680e3ba9932aa1a3b0b735a 100644 (file)
@@ -94,6 +94,7 @@ BaseResultType &MvFaceLandmark::result()
        if (ret != MEDIA_VISION_ERROR_NONE)
                throw runtime_error("Fail to get face landmark detection result count.");
 
+       _output_data._is_empty = result_cnt == 0;
        _output_data._points.clear();
        _output_data._frame_number = frame_number;
 
index daf6f441877013cc828752a89b084e6fb43667d8..87f37e7fe5ae2d13637ede649c1452231cfd772b 100644 (file)
@@ -88,7 +88,8 @@ BaseResultType &MvFaceRecognition::result()
        const char *out_label {};
 
        int ret = mv_face_recognition_get_label(_handle, &out_label);
-       if (ret != MEDIA_VISION_ERROR_NO_DATA) {
+       if (ret == MEDIA_VISION_ERROR_NO_DATA) {
+               _output_data._is_empty = true;
                _output_data._label = "none";
                return _output_data;
        }
index ca1855b9b9928de5537bca5fe40afce336ec614a..250964b26d2972c96f737fe97569340b0f2845bd 100644 (file)
@@ -94,6 +94,7 @@ BaseResultType &MvImageClassification::result()
        if (ret != MEDIA_VISION_ERROR_NONE)
                throw runtime_error("Fail to get image classification result count.");
 
+       _output_data._is_empty = result_cnt == 0;
        _output_data._labels.clear();
        _output_data._frame_number = frame_number;
 
index d70ab7fcbcd45547cd263b83e2a18f8d889020fd..ef81bb51ee37ee8e4f0d69778039e937995f9998 100644 (file)
@@ -94,6 +94,7 @@ BaseResultType &MvObjectDetection::result()
        if (ret != MEDIA_VISION_ERROR_NONE)
                throw runtime_error("Fail to get object detection result count.");
 
+       _output_data._is_empty = result_cnt == 0;
        _output_data._rects.clear();
        _output_data._frame_number = frame_number;
 
index 078762a973059b5ceb68829b47caee65bfd7e44f..4ec596c5a52eec60e7e7b34b78fdd3b0749a2072 100644 (file)
@@ -26,9 +26,6 @@ namespace services
 {
 class BridgeNode : public CallbackNode
 {
-private:
-       bool _enabled { false };
-
 public:
        explicit BridgeNode(const std::string &name)
        {
@@ -47,6 +44,17 @@ public:
                if (!_cb)
                        throw singleo::exception::InvalidOperation("Bridge node callback is not set");
 
+               _status = NodeStatus::INVALID;
+               // If at least one dependency is valid, bridge node is valid.
+               for (auto &dep : this->getDependencies()) {
+                       if (std::dynamic_pointer_cast<TaskNode>(dep)->getStatus() == NodeStatus::VALID) {
+                               _status = NodeStatus::VALID;
+                               break;
+                       }
+               }
+               if (_status == NodeStatus::INVALID)
+                       return;
+
                _results.clear();
 
                for (const auto &d : _dependencies)
@@ -55,16 +63,11 @@ public:
                _cb(this);
 
                _inputBuffer->release();
-               _enabled = false;
 
+               _status = NodeStatus::INVALID;
                // Bridge node got the result from previous task node so enable this bridge node.
                if (_outputBuffer)
-                       _enabled = true;
-       }
-
-       bool isEnabled()
-       {
-               return _enabled;
+                       _status = NodeStatus::VALID;
        }
 };
 
index e98333db768c3a7494f8240283fabb2d197bf019..a4ae849f19227d0894138c32831b7ef2b27ccf80 100644 (file)
@@ -32,6 +32,7 @@ class CallbackNode : public INode, public std::enable_shared_from_this<CallbackN
 {
 protected:
        NodeType _type { NodeType::NONE };
+       NodeStatus _status { NodeStatus::NONE };
        std::string _name;
        std::vector<std::shared_ptr<INode> > _dependencies;
        std::vector<std::shared_ptr<INode> > _nexts;
@@ -51,6 +52,7 @@ public:
        virtual ~CallbackNode() = default;
 
        NodeType getType() override;
+       NodeStatus getStatus() override;
        std::string &getName() override
        {
                return _name;
index 771b1e284ccfa359ff5a23514e9b62f8dc9a508f..6d137fe494a5cf033e2b616d84979769a2a87462 100644 (file)
@@ -40,6 +40,17 @@ public:
 
        void invoke() final
        {
+               _status = NodeStatus::INVALID;
+               // If at least one dependency is valid, endpoint node is valid.
+               for (auto &dep : this->getDependencies()) {
+                       if (std::dynamic_pointer_cast<TaskNode>(dep)->getStatus() == NodeStatus::VALID) {
+                               _status = NodeStatus::VALID;
+                               break;
+                       }
+               }
+               if (_status == NodeStatus::INVALID)
+                       return;
+
                _results.clear();
 
                for (auto &d : _dependencies)
index e15604d30fedb1daa8c25cc2d2e06b4b1829c9a1..6aec7bffe64af17f0fad74efe12b99563c3d530f 100644 (file)
@@ -31,12 +31,14 @@ namespace services
 {
 enum class NodeType { NONE, INFERENCE, TRAINING, BRIDGE, ENDPOINT };
 
+enum class NodeStatus { NONE, VALID, INVALID };
 class INode
 {
 public:
        virtual ~INode() {};
 
        virtual NodeType getType() = 0;
+       virtual NodeStatus getStatus() = 0;
        virtual std::string &getName() = 0;
        virtual void setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer) = 0;
        virtual std::shared_ptr<SharedBuffer> &getInputBuffer() = 0;
index c32fa21430e01075b7e2a5b4521485e808bf3e34..26fe4e3e2a64444aee9e7f48dc5c831440a4a260 100644 (file)
@@ -32,6 +32,7 @@ class TaskNode : public INode, public std::enable_shared_from_this<TaskNode>
 {
 protected:
        NodeType _type { NodeType::NONE };
+       NodeStatus _status { NodeStatus::NONE };
        std::string _name;
        std::vector<std::shared_ptr<INode> > _dependencies;
        std::vector<std::shared_ptr<INode> > _nexts;
@@ -49,6 +50,7 @@ public:
        virtual ~TaskNode() = default;
 
        NodeType getType() override;
+       NodeStatus getStatus() override;
        std::string &getName() override
        {
                return _name;
index ed829ac7547ad0615f49b5f9c21acba4c754f6e9..ed3ceff38f85781d54f001bd67c48699e5d0a439 100644 (file)
@@ -32,6 +32,11 @@ NodeType CallbackNode::getType()
        return _type;
 }
 
+NodeStatus CallbackNode::getStatus()
+{
+       return _status;
+}
+
 void CallbackNode::setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer)
 {
        inputBuffer->addRef();
index a04176ba08bc8c450b44cb3535494dcbe5287e21..131e048c24360b84fd8024d6c9f30bdcd3355983 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "SingleoLog.h"
 #include "InferenceNode.h"
+#include "CallbackNode.h"
 
 using namespace std;
 using namespace singleo::inference;
@@ -37,6 +38,13 @@ void InferenceNode::configure()
 
 void InferenceNode::invoke()
 {
+       // Note: Inference node can have at max one dependency.
+       for (auto &dep : this->getDependencies()) {
+               if (dynamic_pointer_cast<CallbackNode>(dep)->getStatus() == NodeStatus::INVALID) {
+                       _status = NodeStatus::INVALID;
+                       return;
+               }
+       }
        auto &inputs = _inputBuffer->getInputs();
 
        // TODO. consider for multiple inputs later.
@@ -47,10 +55,13 @@ void InferenceNode::invoke()
 
        _results.clear();
 
-       _resultMutex.lock();
-       _results.push_back(_task->result().clone());
-
-       _resultMutex.unlock();
+       _status = NodeStatus::INVALID;
+       if (!_task->result()._is_empty) {
+               _resultMutex.lock();
+               _results.push_back(_task->result().clone());
+               _resultMutex.unlock();
+               _status = NodeStatus::VALID;
+       }
 }
 
 std::vector<std::shared_ptr<BaseResultType> > &InferenceNode::results()
index 73b1d6a07a16e3339bb7b5a5210cbbc7d138325c..cb912dd1bbab9bec7966e741576d09a757d5fbd2 100644 (file)
@@ -59,14 +59,15 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
                // If InferenceNode has dependency node then the dependency node must be BridgeNode like above graph.
                // Add output from the BridgeNode to InferenceNode as input.
                if (!node->getDependencies().empty()) {
-                       // TODO. consider for multiple dependencies later.
                        auto &callbackNode = node->getDependencies()[0];
                        auto &outputBuffer = callbackNode->getOutputBuffer();
 
-                       node->setInputBuffer(outputBuffer);
+                       if (outputBuffer) {
+                               node->setInputBuffer(outputBuffer);
 
-                       // output buffer has been shared to node so release it here.
-                       outputBuffer->release();
+                               // output buffer has been shared to node so release it here.
+                               outputBuffer->release();
+                       }
                }
        } else {
                // TODO. consider for mulitple inputs later.
@@ -82,17 +83,6 @@ void TaskManager::threadCb(shared_ptr<INode> &node)
 
        // Spawn threads for next nodes
        for (auto &n : node->getNexts()) {
-               if (node->getType() == NodeType::BRIDGE) {
-                       auto b_node = dynamic_pointer_cast<BridgeNode>(node);
-
-                       // In case of BRIDGE node, if this bridge node didn't get the result from previous task node,
-                       // isEnabled() is false. So if isEnabled() is false, stop all sub graph pipelines connected to this node.
-                       if (!b_node->isEnabled()) {
-                               n->wakeup();
-                               continue;
-                       }
-               }
-
                std::lock_guard<std::mutex> lock(_thread_mutex);
                if (_is_thread_created.find(n) == _is_thread_created.end()) {
                        _threads.push(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
@@ -164,8 +154,8 @@ void TaskManager::verifyGraph()
        // Verify graph rule.
        for (auto &node : _nodes) {
                if (node->getNexts().size() == 0 && node->getType() != NodeType::ENDPOINT) {
-                       SINGLEO_LOGE("Last node should be endpoint node.");
-                       throw InvalidOperation("Last node should be endpoint node.");
+                       SINGLEO_LOGE("All nodes except endpoint node must have a next node.");
+                       throw InvalidOperation("All nodes except endpoint node must have a next node.");
                }
 
                if (node->getType() == NodeType::BRIDGE) {
@@ -182,6 +172,10 @@ void TaskManager::verifyGraph()
                                }
                        }
                } else if (node->getType() == NodeType::INFERENCE || node->getType() == NodeType::TRAINING) {
+                       if (node->getDependencies().size() > 1) {
+                               SINGLEO_LOGE("The inference or training node cannot have more than one dependency.");
+                               throw InvalidOperation("The inference or training node cannot have more than one dependency.");
+                       }
                        // Verify if InferenceNode or TrainingNode is located in front of BridgeNode, behind BridgeNode or in front of EndpointNode.
                        // ... inference or training node ---- bridge node ...
                        // ... bridge node ---- inference or training node ...
index ead1dfec9ee8ba56c169656bc4a476d9f946ab3d..721bfaa44b9ce7c483945f86f9028ee8801b6299 100644 (file)
@@ -31,6 +31,11 @@ NodeType TaskNode::getType()
        return _type;
 }
 
+NodeStatus TaskNode::getStatus()
+{
+       return _status;
+}
+
 void TaskNode::setInputBuffer(std::shared_ptr<SharedBuffer> &inputBuffer)
 {
        inputBuffer->addRef();