add task manager support
authorInki Dae <inki.dae@samsung.com>
Tue, 30 Apr 2024 00:55:59 +0000 (09:55 +0900)
committerInki Dae <inki.dae@samsung.com>
Thu, 2 May 2024 07:25:36 +0000 (16:25 +0900)
Change-Id: I02cc508ee26aad8312629d0c69300f21f8221ba3
Signed-off-by: Inki Dae <inki.dae@samsung.com>
services/CMakeLists.txt
services/task_manager/CMakeLists.txt [new file with mode: 0644]
services/task_manager/include/CallbackTaskNode.h [new file with mode: 0644]
services/task_manager/include/ITaskNode.h [new file with mode: 0644]
services/task_manager/include/InferenceTaskNode.h [new file with mode: 0644]
services/task_manager/include/TaskManager.h [new file with mode: 0644]
services/task_manager/src/CallbackTaskNode.cpp [new file with mode: 0644]
services/task_manager/src/InferenceTaskNode.cpp [new file with mode: 0644]
services/task_manager/src/TaskManager.cpp [new file with mode: 0644]

index c4fb763047da4cb9e2eb20d937852cb6470b0595..e319e6ef49221abea8b5601868ea2d60704f6a47 100644 (file)
@@ -9,6 +9,8 @@ FILE(GLOB SINGLEO_SERVICE_SOURCE_FILES "${PROJECT_SOURCE_DIR}/*.cpp"
 INCLUDE(${ROOT_DIRECTORY}/input/CMakeLists.txt)
 INCLUDE(${ROOT_DIRECTORY}/inference/CMakeLists.txt)
 
+INCLUDE(task_manager/CMakeLists.txt)
+
 IF (${USE_AUTOZOOM_API})
     INCLUDE(auto_zoom/CMakeLists.txt)
 ENDIF()
diff --git a/services/task_manager/CMakeLists.txt b/services/task_manager/CMakeLists.txt
new file mode 100644 (file)
index 0000000..e381720
--- /dev/null
@@ -0,0 +1,8 @@
+SET(SINGLEO_SERVICE_SOURCE_FILES
+    ${SINGLEO_SERVICE_SOURCE_FILES}
+    task_manager/src/TaskManager.cpp
+    task_manager/src/InferenceTaskNode.cpp
+    task_manager/src/CallbackTaskNode.cpp
+)
+
+LIST(APPEND SERVICE_HEADER_LIST ${SERVICE_HEADER_LIST} ${CMAKE_CURRENT_SOURCE_DIR}/task_manager/include)
\ No newline at end of file
diff --git a/services/task_manager/include/CallbackTaskNode.h b/services/task_manager/include/CallbackTaskNode.h
new file mode 100644 (file)
index 0000000..59d08ee
--- /dev/null
@@ -0,0 +1,72 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CALLBACK_TASK_NODE_H__
+#define __CALLBACK_TASK_NODE_H__
+
+#include "ITaskNode.h"
+#include "SingleoException.h"
+
+namespace singleo
+{
+namespace services
+{
+class CallbackTaskNode : public ITaskNode
+{
+private:
+       TaskNodeType _type { TaskNodeType::CB };
+       std::vector<std::shared_ptr<ITaskNode> > _dependencies;
+       TaskNodeCb _cb;
+       std::vector<ImageDataType> _inputs;
+       std::shared_ptr<ImageDataType> _output;
+       void *_user_data { nullptr };
+       bool _completed { false };
+
+public:
+       CallbackTaskNode() = default;
+       virtual ~CallbackTaskNode() = default;
+
+       void setCompleted(bool completed = true) override
+       {
+               _completed = completed;
+       };
+       TaskNodeType getType() override;
+       void addInput(BaseDataType &input) override;
+       std::vector<ImageDataType> &getInputs() override;
+       bool isCompleted() override;
+       void setInferenceService(std::unique_ptr<inference::IInferenceServiceInterface> &&service) override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       }
+       inference::IInferenceServiceInterface *getInfereceService() override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       }
+       void setCb(TaskNodeCb &cb, void *user_data) override;
+       TaskNodeCb getCb() override;
+       void setUserData(void *user_data) override;
+       void *getUserData() override;
+       void addDependency(std::shared_ptr<ITaskNode> &node) override;
+       std::vector<std::shared_ptr<ITaskNode> > &getDependencies() override;
+
+       void setOutput(std::shared_ptr<BaseDataType> output) override;
+       std::shared_ptr<ImageDataType> &getOutput() override;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
diff --git a/services/task_manager/include/ITaskNode.h b/services/task_manager/include/ITaskNode.h
new file mode 100644 (file)
index 0000000..d9bc55b
--- /dev/null
@@ -0,0 +1,62 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ITASK_NODE_H__
+#define __ITASK_NODE_H__
+
+#include <vector>
+#include <memory>
+#include <functional>
+
+#include "SingleoCommonTypes.h"
+#include "ServiceDataType.h"
+#include "IInferenceServiceInterface.h"
+
+namespace singleo
+{
+namespace services
+{
+enum class TaskNodeType { NONE, INFERENCE, CB };
+
+using TaskNodeCb = std::function<void(void *user_data, std::vector<BaseResultType> &results, BaseDataType &input,
+                                                                         std::shared_ptr<BaseDataType> output)>;
+
+class ITaskNode
+{
+public:
+       virtual ~ITaskNode() {};
+
+       virtual void setCompleted(bool completed = true) = 0;
+       virtual bool isCompleted() = 0;
+       virtual TaskNodeType getType() = 0;
+       virtual void addInput(BaseDataType &input) = 0;
+       virtual std::vector<ImageDataType> &getInputs() = 0;
+       virtual void setInferenceService(std::unique_ptr<inference::IInferenceServiceInterface> &&service) = 0;
+       virtual inference::IInferenceServiceInterface *getInfereceService() = 0;
+       virtual void setCb(TaskNodeCb &cb, void *user_data) = 0;
+       virtual TaskNodeCb getCb() = 0;
+       virtual void setUserData(void *user_data) = 0;
+       virtual void *getUserData() = 0;
+       virtual void addDependency(std::shared_ptr<ITaskNode> &node) = 0;
+       virtual std::vector<std::shared_ptr<ITaskNode> > &getDependencies() = 0;
+       virtual void setOutput(std::shared_ptr<BaseDataType> output) = 0;
+       virtual std::shared_ptr<ImageDataType> &getOutput() = 0;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
diff --git a/services/task_manager/include/InferenceTaskNode.h b/services/task_manager/include/InferenceTaskNode.h
new file mode 100644 (file)
index 0000000..c334838
--- /dev/null
@@ -0,0 +1,76 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __INFERENCE_TASK_NODE_H__
+#define __INFERENCE_TASK_NODE_H__
+
+#include "ITaskNode.h"
+#include "SingleoException.h"
+
+namespace singleo
+{
+namespace services
+{
+class InferenceTaskNode : public ITaskNode
+{
+private:
+       TaskNodeType _type { TaskNodeType::INFERENCE };
+       std::unique_ptr<inference::IInferenceServiceInterface> _service;
+       std::vector<std::shared_ptr<ITaskNode> > _dependencies;
+       std::vector<ImageDataType> _inputs;
+       std::shared_ptr<ImageDataType> _output;
+       bool _completed { false };
+
+public:
+       InferenceTaskNode() = default;
+       virtual ~InferenceTaskNode() = default;
+
+       TaskNodeType getType() override;
+       void addInput(BaseDataType &input) override;
+       std::vector<ImageDataType> &getInputs() override;
+       void setCompleted(bool completed = true) override
+       {
+               _completed = completed;
+       };
+       bool isCompleted() override;
+       void setInferenceService(std::unique_ptr<inference::IInferenceServiceInterface> &&service) override;
+       inference::IInferenceServiceInterface *getInfereceService() override;
+       void setCb(TaskNodeCb &cb, void *user_data) override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       };
+       TaskNodeCb getCb() override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       };
+       void setUserData(void *user_data) override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       };
+       void *getUserData() override
+       {
+               throw exception::InvalidOperation("Not supported.");
+       };
+       void addDependency(std::shared_ptr<ITaskNode> &node) override;
+       std::vector<std::shared_ptr<ITaskNode> > &getDependencies() override;
+       void setOutput(std::shared_ptr<BaseDataType> output) override;
+       std::shared_ptr<ImageDataType> &getOutput() override;
+};
+
+}
+}
+
+#endif
\ No newline at end of file
diff --git a/services/task_manager/include/TaskManager.h b/services/task_manager/include/TaskManager.h
new file mode 100644 (file)
index 0000000..20f3db3
--- /dev/null
@@ -0,0 +1,60 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TASK_MANAGER_H__
+#define __TASK_MANAGER_H__
+
+#include <vector>
+#include <memory>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+
+#include "IInferenceServiceInterface.h"
+#include "SingleoCommonTypes.h"
+#include "ITaskNode.h"
+
+namespace singleo
+{
+namespace services
+{
+class TaskManager
+{
+private:
+       std::vector<ImageDataType> _inputs;
+       std::shared_ptr<inference::IInferenceServiceInterface> _service;
+       std::vector<std::shared_ptr<ITaskNode> > _nodes;
+       std::vector<std::shared_ptr<std::thread> > _threads;
+       std::mutex _mutex;
+       std::condition_variable _event;
+
+       void threadCb(std::shared_ptr<ITaskNode> &node);
+
+public:
+       TaskManager() = default;
+       ~TaskManager() = default;
+
+       void addInput(BaseDataType &input);
+       std::vector<ImageDataType> &getInputs();
+       void addNode(std::shared_ptr<ITaskNode> node);
+       void run();
+       BaseResultType &output();
+};
+
+}
+}
+
+#endif
\ No newline at end of file
diff --git a/services/task_manager/src/CallbackTaskNode.cpp b/services/task_manager/src/CallbackTaskNode.cpp
new file mode 100644 (file)
index 0000000..f7c6945
--- /dev/null
@@ -0,0 +1,87 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CallbackTaskNode.h"
+
+using namespace std;
+
+namespace singleo
+{
+namespace services
+{
+TaskNodeType CallbackTaskNode::getType()
+{
+       return _type;
+}
+
+void CallbackTaskNode::addInput(BaseDataType &input)
+{
+       _inputs.push_back(dynamic_cast<ImageDataType &>(input));
+}
+
+std::vector<ImageDataType> &CallbackTaskNode::getInputs()
+{
+       return _inputs;
+}
+
+bool CallbackTaskNode::isCompleted()
+{
+       return _completed;
+}
+
+void CallbackTaskNode::setCb(TaskNodeCb &cb, void *user_data)
+{
+       _cb = cb;
+       _user_data = user_data;
+}
+
+TaskNodeCb CallbackTaskNode::getCb()
+{
+       return _cb;
+}
+
+void CallbackTaskNode::setUserData(void *user_data)
+{
+       _user_data = user_data;
+}
+
+void *CallbackTaskNode::getUserData()
+{
+       return _user_data;
+}
+
+void CallbackTaskNode::addDependency(std::shared_ptr<ITaskNode> &node)
+{
+       _dependencies.push_back(node);
+}
+
+std::vector<std::shared_ptr<ITaskNode> > &CallbackTaskNode::getDependencies()
+{
+       return _dependencies;
+}
+
+void CallbackTaskNode::setOutput(std::shared_ptr<BaseDataType> output)
+{
+       _output = dynamic_pointer_cast<ImageDataType>(output);
+}
+
+std::shared_ptr<ImageDataType> &CallbackTaskNode::getOutput()
+{
+       return _output;
+}
+
+}
+}
\ No newline at end of file
diff --git a/services/task_manager/src/InferenceTaskNode.cpp b/services/task_manager/src/InferenceTaskNode.cpp
new file mode 100644 (file)
index 0000000..e9a30d7
--- /dev/null
@@ -0,0 +1,77 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "InferenceTaskNode.h"
+
+using namespace std;
+using namespace singleo::inference;
+
+namespace singleo
+{
+namespace services
+{
+TaskNodeType InferenceTaskNode::getType()
+{
+       return _type;
+}
+
+void InferenceTaskNode::addInput(BaseDataType &input)
+{
+       _inputs.push_back(dynamic_cast<ImageDataType &>(input));
+}
+
+std::vector<ImageDataType> &InferenceTaskNode::getInputs()
+{
+       return _inputs;
+}
+
+bool InferenceTaskNode::isCompleted()
+{
+       return _completed;
+}
+
+void InferenceTaskNode::setInferenceService(unique_ptr<inference::IInferenceServiceInterface> &&service)
+{
+       _service = move(service);
+}
+
+IInferenceServiceInterface *InferenceTaskNode::getInfereceService()
+{
+       return _service.get();
+}
+
+void InferenceTaskNode::addDependency(std::shared_ptr<ITaskNode> &node)
+{
+       _dependencies.push_back(node);
+}
+
+std::vector<std::shared_ptr<ITaskNode> > &InferenceTaskNode::getDependencies()
+{
+       return _dependencies;
+}
+
+void InferenceTaskNode::setOutput(std::shared_ptr<BaseDataType> output)
+{
+       _output = dynamic_pointer_cast<ImageDataType>(output);
+}
+
+std::shared_ptr<ImageDataType> &InferenceTaskNode::getOutput()
+{
+       return _output;
+}
+
+}
+}
\ No newline at end of file
diff --git a/services/task_manager/src/TaskManager.cpp b/services/task_manager/src/TaskManager.cpp
new file mode 100644 (file)
index 0000000..c53f8cf
--- /dev/null
@@ -0,0 +1,144 @@
+/**
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+#include "SingleoException.h"
+#include "SingleoLog.h"
+#include "TaskManager.h"
+#include <opencv2/opencv.hpp>
+
+using namespace std;
+using namespace singleo::exception;
+
+namespace singleo
+{
+namespace services
+{
+void TaskManager::threadCb(shared_ptr<ITaskNode> &node)
+{
+       vector<BaseResultType> results;
+       vector<shared_ptr<ITaskNode> > &dependencies = node->getDependencies();
+
+       for (auto &n : dependencies) {
+               if (n->getType() != TaskNodeType::INFERENCE)
+                       continue;
+
+               unique_lock<mutex> lock(_mutex);
+
+               // Wait until all nodes added to this node as dependency are completed
+               _event.wait(lock, [n] { return n->isCompleted(); });
+
+               results.push_back(n->getInfereceService()->result());
+       }
+
+       if (node->getType() == TaskNodeType::INFERENCE) {
+               if (_inputs[0]._data_type != DataType::IMAGE) {
+                       SINGLEO_LOGE("Invalid input data type.");
+                       throw InvalidOperation("Invalid input data type");
+               }
+
+               ImageDataType input;
+
+               // If no dependency then use input of current node as input source.
+               // Ps. In case of inference task node, one of both - input from task manager
+               //     (inference_node_a or inference_node_b) or input from dependency node
+               //     output(inference_node_c) - will be used as input source like below graph,
+               //
+               // inference_node_a -----
+               //                      |------ Callback_node ------- inference_node_c
+               // inference_node_b -----
+               if (node->getDependencies().empty())
+                       // TODO. consider for multiple inputs later.
+                       input = dynamic_cast<ImageDataType &>(node->getInputs()[0]);
+               else // Else if dependency then use output of dependency node(callback task node).
+                       input = dynamic_cast<ImageDataType &>(*node->getDependencies()[0]->getOutput());
+
+               node->getInfereceService()->invoke(input);
+
+               // Service request to 'input' has been completed so clean up relevant resources.
+               node->getInputs().clear();
+               _inputs.clear();
+               delete input.ptr;
+       } else if (node->getType() == TaskNodeType::CB) {
+               TaskNodeCb cb = node->getCb();
+               void *user_data = node->getUserData();
+
+               // TODO. consider for multiple inputs later.
+               ImageDataType &input = dynamic_cast<ImageDataType &>(_inputs[0]);
+               node->setOutput(dynamic_pointer_cast<BaseDataType>(make_shared<ImageDataType>()));
+
+               cb(user_data, results, input, dynamic_pointer_cast<BaseDataType>(node->getOutput()));
+       }
+
+       // Set one of the dependent nodes as completed and wake up.
+       node->setCompleted();
+       _event.notify_all();
+}
+
+void TaskManager::addInput(BaseDataType &input)
+{
+       auto imageData = dynamic_cast<ImageDataType &>(input);
+       ImageDataType copied = imageData;
+       size_t buffer_size = imageData.width * imageData.height * imageData.byte_per_pixel;
+
+       copied.ptr = new unsigned char[buffer_size];
+       memcpy(copied.ptr, imageData.ptr, buffer_size);
+
+       _inputs.push_back(dynamic_cast<ImageDataType &>(copied));
+}
+
+vector<ImageDataType> &TaskManager::getInputs()
+{
+       return _inputs;
+}
+
+void TaskManager::addNode(std::shared_ptr<ITaskNode> node)
+{
+       _nodes.push_back(node);
+
+       // Initialize inference service.
+       if (node->getType() == TaskNodeType::INFERENCE) {
+               node->getInfereceService()->configure();
+               node->getInfereceService()->prepare();
+       }
+}
+
+void TaskManager::run()
+{
+       for (auto &n : _nodes) {
+               // Set input as source of current node.
+               // If no dependency then this node has to receive input source
+               // from singleo concrete service such as Autozoom, else if dependency
+               // then the input of the node will be set in threadCb callback function.
+               if (n->getDependencies().empty())
+                       // TODO. consider for multiple sources later.
+                       n->addInput(_inputs[0]);
+
+               _threads.push_back(make_shared<thread>(&TaskManager::threadCb, this, std::ref(n)));
+       }
+
+       _threads[_threads.size() - 1]->join();
+}
+
+BaseResultType &TaskManager::output()
+{
+       auto service = _nodes[_nodes.size() - 1]->getInfereceService();
+
+       return service->result();
+}
+
+}
+}
\ No newline at end of file