From: Inki Dae Date: Tue, 30 Apr 2024 00:55:59 +0000 (+0900) Subject: add task manager support X-Git-Tag: accepted/tizen/unified/20240903.110722~58 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=refs%2Fchanges%2F86%2F310586%2F12;p=platform%2Fcore%2Fapi%2Fsingleo.git add task manager support Add task manager support which provides multi thread based service pipeline configuration feature in runtime. Each service of singleo service framework can be implemented using one or ones of various inference services such as face detection, face landmark detection, object detection, and so on. As for this, task manager consists of below features, TaskManager : Specific service such as Autozoom or service groups - which can be introduced later - has one TaskManager handle. ITaskNode : Interface class which can contain inference service, callback service, and training service(TODO) Basically, TaskManager provides the interface that we can configure specific service using graph concept. I.e., Below graph shows a service which uses two kinds of task interfaces, one is for inference and other is for callback. ---inference_node_a ---- input ---| |----- callback_node ----- inference_node_c ---inference_node_b ---- With above node graph, inference node a and b can be performed in parallel, and both outputs of them goes to callback node. After that, the callback node creates a new data converted by original input data and results came from inference_node_a and inference_node_b so that new data can be allowed by inference node c as input source, and then passes it to the inference node c as input source. Finally, inference node c performs the the service request with the given input and then returns the service result to user. Ps. task service node will be performed with its own thread context. Below is a example which configures above pipeline graph, task_manager = make_unique(); task_manager->input(src); auto face_detection = factory->createFaceDetection(); auto inference_node_a = make_unique(); inference_node_a->addInferenceService(face_detection); task_manager->addNode(inference_node_a); auto image_classification = factory->createImageClassification(); auto inference_node_b = make_unique(); inference_node_b->addInferenceService(image_classification); task_manager->addNode(inference_node_b); auto callback_node = make_unique(); callback_node->setCb(specific_callback, callback_node.get()); callback_node->addDependency(inference_node_a); callback_node->addDependency(inference_node_b); task_manager->addNode(callback_node); auto face_landmark_detection = factory->createFaceLandmarkDetection(); auto inference_node_c = make_unique(); infernce_node_c->addInferenceService(face_landmark_detection); inference_node_c->addDependency(callback_node); task_manger->addNode(inference_node_c); task_manager->run(); result = task_manager->output(); Change-Id: I02cc508ee26aad8312629d0c69300f21f8221ba3 Signed-off-by: Inki Dae --- diff --git a/common/include/SingleoCommonTypes.h b/common/include/SingleoCommonTypes.h index 47ecb88..03b0cd5 100644 --- a/common/include/SingleoCommonTypes.h +++ b/common/include/SingleoCommonTypes.h @@ -61,6 +61,7 @@ struct ImageDataType : public BaseDataType { unsigned int height {}; unsigned int byte_per_pixel {}; ImagePixelFormat pixel_format { ImagePixelFormat::NONE }; + bool custom { false }; }; struct RawDataType : public BaseDataType { diff --git a/services/CMakeLists.txt b/services/CMakeLists.txt index c4fb763..e319e6e 100644 --- a/services/CMakeLists.txt +++ b/services/CMakeLists.txt @@ -9,6 +9,8 @@ FILE(GLOB SINGLEO_SERVICE_SOURCE_FILES "${PROJECT_SOURCE_DIR}/*.cpp" INCLUDE(${ROOT_DIRECTORY}/input/CMakeLists.txt) INCLUDE(${ROOT_DIRECTORY}/inference/CMakeLists.txt) +INCLUDE(task_manager/CMakeLists.txt) + IF (${USE_AUTOZOOM_API}) INCLUDE(auto_zoom/CMakeLists.txt) ENDIF() diff --git a/services/task_manager/CMakeLists.txt b/services/task_manager/CMakeLists.txt new file mode 100644 index 0000000..76e71d8 --- /dev/null +++ b/services/task_manager/CMakeLists.txt @@ -0,0 +1,8 @@ +SET(SINGLEO_SERVICE_SOURCE_FILES + ${SINGLEO_SERVICE_SOURCE_FILES} + task_manager/src/TaskManager.cpp + task_manager/src/InferenceNode.cpp + task_manager/src/CallbackNode.cpp +) + +LIST(APPEND SERVICE_HEADER_LIST ${SERVICE_HEADER_LIST} ${CMAKE_CURRENT_SOURCE_DIR}/task_manager/include) \ No newline at end of file diff --git a/services/task_manager/include/CallbackNode.h b/services/task_manager/include/CallbackNode.h new file mode 100644 index 0000000..0500789 --- /dev/null +++ b/services/task_manager/include/CallbackNode.h @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __CALLBACK_NODE_H__ +#define __CALLBACK_NODE_H__ + +#include +#include + +#include "INode.h" +#include "SingleoException.h" + +namespace singleo +{ +namespace services +{ +class CallbackNode : public INode +{ +private: + std::string _name; + NodeType _type { NodeType::CB }; + std::vector > _dependencies; + NodeCb _cb; + std::vector > _inputs; + std::shared_ptr _output; + void *_user_data { nullptr }; + bool _completed { false }; + std::condition_variable _event; + std::mutex _mutex; + +public: + CallbackNode() = default; + virtual ~CallbackNode() = default; + + NodeType getType() override; + void setName(std::string name) override + { + _name = name; + } + std::string &getName() override + { + return _name; + } + void addInput(std::shared_ptr input) + { + throw exception::InvalidOperation("Not supported."); + } + std::vector > &getInputs() override; + void setInferenceService(std::unique_ptr &&service) override + { + throw exception::InvalidOperation("Not supported."); + } + inference::IInferenceServiceInterface *getInferenceService() override + { + throw exception::InvalidOperation("Not supported."); + } + void setCb(const NodeCb &cb, void *user_data) override; + NodeCb getCb() override; + void setUserData(void *user_data) override; + void *getUserData() override; + void addDependency(std::shared_ptr node) override; + std::vector > &getDependencies() override; + + void setOutput(std::shared_ptr output) override; + std::shared_ptr &getOutput() override; + void wait() override; + void wakeup() override; +}; + +} +} + +#endif \ No newline at end of file diff --git a/services/task_manager/include/INode.h b/services/task_manager/include/INode.h new file mode 100644 index 0000000..11b0a57 --- /dev/null +++ b/services/task_manager/include/INode.h @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __INODE_H__ +#define __INODE_H__ + +#include +#include +#include + +#include "SingleoCommonTypes.h" +#include "ServiceDataType.h" +#include "IInferenceServiceInterface.h" + +namespace singleo +{ +namespace services +{ +enum class NodeType { NONE, INFERENCE, CB }; + +using NodeCb = std::function &results, + std::shared_ptr input)>; + +class INode +{ +public: + virtual ~INode() {}; + + virtual NodeType getType() = 0; + virtual void setName(std::string name) = 0; + virtual std::string &getName() = 0; + virtual void addInput(std::shared_ptr input) = 0; + virtual std::vector > &getInputs() = 0; + virtual void setInferenceService(std::unique_ptr &&service) = 0; + virtual inference::IInferenceServiceInterface *getInferenceService() = 0; + virtual void setCb(const NodeCb &cb, void *user_data) = 0; + virtual NodeCb getCb() = 0; + virtual void setUserData(void *user_data) = 0; + virtual void *getUserData() = 0; + virtual void addDependency(std::shared_ptr node) = 0; + virtual std::vector > &getDependencies() = 0; + virtual void setOutput(std::shared_ptr output) = 0; + virtual std::shared_ptr &getOutput() = 0; + virtual void wait() = 0; + virtual void wakeup() = 0; +}; + +} +} + +#endif \ No newline at end of file diff --git a/services/task_manager/include/InferenceNode.h b/services/task_manager/include/InferenceNode.h new file mode 100644 index 0000000..b87a86c --- /dev/null +++ b/services/task_manager/include/InferenceNode.h @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __INFERENCE_NODE_H__ +#define __INFERENCE_NODE_H__ + +#include +#include + +#include "INode.h" +#include "SingleoException.h" + +namespace singleo +{ +namespace services +{ +class InferenceNode : public INode +{ +private: + NodeType _type { NodeType::INFERENCE }; + std::string _name; + std::unique_ptr _service; + std::vector > _dependencies; + std::vector > _inputs; + std::shared_ptr _output; + bool _completed { false }; + std::condition_variable _event; + std::mutex _mutex; + +public: + InferenceNode() = default; + virtual ~InferenceNode() = default; + + NodeType getType() override; + void setName(std::string name) override + { + _name = name; + } + std::string &getName() override + { + return _name; + } + void addInput(std::shared_ptr input) override; + std::vector > &getInputs() override; + void setInferenceService(std::unique_ptr &&service) override; + inference::IInferenceServiceInterface *getInferenceService() override; + void setCb(const NodeCb &cb, void *user_data) override + { + throw exception::InvalidOperation("Not supported."); + }; + NodeCb getCb() override + { + throw exception::InvalidOperation("Not supported."); + }; + void setUserData(void *user_data) override + { + throw exception::InvalidOperation("Not supported."); + }; + void *getUserData() override + { + throw exception::InvalidOperation("Not supported."); + }; + void addDependency(std::shared_ptr node) override; + std::vector > &getDependencies() override; + void setOutput(std::shared_ptr output) override; + std::shared_ptr &getOutput() override; + void wait() override; + void wakeup() override; +}; + +} +} + +#endif \ No newline at end of file diff --git a/services/task_manager/include/TaskManager.h b/services/task_manager/include/TaskManager.h new file mode 100644 index 0000000..03cdcba --- /dev/null +++ b/services/task_manager/include/TaskManager.h @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __TASK_MANAGER_H__ +#define __TASK_MANAGER_H__ + +#include +#include +#include + +#include "IInferenceServiceInterface.h" +#include "SingleoCommonTypes.h" +#include "INode.h" + +namespace singleo +{ +namespace services +{ +class TaskManager +{ +private: + std::vector > _inputs; + std::vector > _nodes; + std::vector > _threads; + + void threadCb(std::shared_ptr &node); + +public: + TaskManager() = default; + ~TaskManager() = default; + + void addInput(BaseDataType &input); + std::vector > &getInputs(); + void addNode(std::shared_ptr node); + void run(); + BaseResultType &output(); + void clear(); +}; + +} +} + +#endif \ No newline at end of file diff --git a/services/task_manager/src/CallbackNode.cpp b/services/task_manager/src/CallbackNode.cpp new file mode 100644 index 0000000..5be4e4b --- /dev/null +++ b/services/task_manager/src/CallbackNode.cpp @@ -0,0 +1,97 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "CallbackNode.h" + +using namespace std; + +namespace singleo +{ +namespace services +{ +NodeType CallbackNode::getType() +{ + return _type; +} + +vector > &CallbackNode::getInputs() +{ + return _inputs; +} + +void CallbackNode::setCb(const NodeCb &cb, void *user_data) +{ + _cb = cb; + _user_data = user_data; +} + +NodeCb CallbackNode::getCb() +{ + return _cb; +} + +void CallbackNode::setUserData(void *user_data) +{ + _user_data = user_data; +} + +void *CallbackNode::getUserData() +{ + return _user_data; +} + +void CallbackNode::addDependency(std::shared_ptr node) +{ + _dependencies.push_back(node); +} + +std::vector > &CallbackNode::getDependencies() +{ + return _dependencies; +} + +void CallbackNode::setOutput(std::shared_ptr output) +{ + _output = dynamic_pointer_cast(output); +} + +std::shared_ptr &CallbackNode::getOutput() +{ + return _output; +} + +void CallbackNode::wait() +{ + unique_lock lock(_mutex); + + // If already completed then just return. + if (_completed) + return; + + _event.wait(lock, [this] { return this->_completed; }); +} + +void CallbackNode::wakeup() +{ + unique_lock lock(_mutex); + + _completed = true; + _event.notify_all(); +} + +} +} \ No newline at end of file diff --git a/services/task_manager/src/InferenceNode.cpp b/services/task_manager/src/InferenceNode.cpp new file mode 100644 index 0000000..5cdc6f4 --- /dev/null +++ b/services/task_manager/src/InferenceNode.cpp @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "InferenceNode.h" + +using namespace std; +using namespace singleo::inference; + +namespace singleo +{ +namespace services +{ +NodeType InferenceNode::getType() +{ + return _type; +} + +void InferenceNode::addInput(shared_ptr input) +{ + _inputs.push_back(input); +} + +vector > &InferenceNode::getInputs() +{ + return _inputs; +} + +void InferenceNode::setInferenceService(unique_ptr &&service) +{ + _service = move(service); +} + +IInferenceServiceInterface *InferenceNode::getInferenceService() +{ + return _service.get(); +} + +void InferenceNode::addDependency(std::shared_ptr node) +{ + _dependencies.push_back(node); +} + +std::vector > &InferenceNode::getDependencies() +{ + return _dependencies; +} + +void InferenceNode::setOutput(std::shared_ptr output) +{ + _output = dynamic_pointer_cast(output); +} + +std::shared_ptr &InferenceNode::getOutput() +{ + return _output; +} + +void InferenceNode::wait() +{ + unique_lock lock(_mutex); + + // If already completed then just return. + if (_completed) + return; + + _event.wait(lock, [this] { return this->_completed; }); +} + +void InferenceNode::wakeup() +{ + unique_lock lock(_mutex); + + _completed = true; + _event.notify_all(); +} + +} +} \ No newline at end of file diff --git a/services/task_manager/src/TaskManager.cpp b/services/task_manager/src/TaskManager.cpp new file mode 100644 index 0000000..3cb7e44 --- /dev/null +++ b/services/task_manager/src/TaskManager.cpp @@ -0,0 +1,160 @@ +/** + * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "SingleoException.h" +#include "SingleoLog.h" +#include "TaskManager.h" + +using namespace std; +using namespace singleo::exception; + +namespace singleo +{ +namespace services +{ +void TaskManager::threadCb(shared_ptr &node) +{ + vector results; + vector > &dependencies = node->getDependencies(); + + SINGLEO_LOGD("Launched node name = %s", node->getName().c_str()); + + for (auto &n : dependencies) { + // Add the result if dependency node is inference service not callback. + if (n->getType() == NodeType::INFERENCE) + results.push_back(&n->getInferenceService()->result()); + } + + if (node->getType() == NodeType::INFERENCE) { + if (_inputs[0]->_data_type != DataType::IMAGE) { + SINGLEO_LOGE("Invalid input data type."); + throw InvalidOperation("Invalid input data type"); + } + + shared_ptr input; + + // If no dependency then use input of current node as input source. + // Ps. In case of inference node, one of both - input from task manager + // (inference_node_a or inference_node_b) or input from dependency node + // output(inference_node_c) - will be used as input source like below graph, + // + // inference_node_a ----- + // |------ Callback_node ------- inference_node_c + // inference_node_b ----- + if (node->getDependencies().empty()) + // TODO. consider for multiple inputs later. + input = dynamic_pointer_cast(node->getInputs()[0]); + else // Else if dependency then use output of dependency node(callback node). + input = dynamic_pointer_cast(node->getDependencies()[0]->getOutput()); + + node->getInferenceService()->invoke(*input); + + // Inference request has been completed so release input data if the data was internally allocated by callback node. + if (input->custom) + delete input->ptr; + + // Service request to 'input' has been completed so clean up it. + node->getInputs().clear(); + } else if (node->getType() == NodeType::CB) { + NodeCb cb = node->getCb(); + void *user_data = node->getUserData(); + + // Call the callback function registered in this callback node. + // In this callback, new data should be set by calling node->setOutput() + // if new data is made. TODO. consider for multiple inputs later. + cb(user_data, results, _inputs[0]); + } + + // Wake up. + node->wakeup(); +} + +void TaskManager::addInput(BaseDataType &input) +{ + auto imageData = dynamic_cast(input); + + _inputs.push_back(make_shared(imageData)); +} + +vector > &TaskManager::getInputs() +{ + return _inputs; +} + +void TaskManager::addNode(std::shared_ptr node) +{ + _nodes.push_back(node); + + // Initialize inference service. + if (node->getType() == NodeType::INFERENCE) { + node->getInferenceService()->configure(); + node->getInferenceService()->prepare(); + } +} + +void TaskManager::run() +{ + if (_inputs.empty()) { + SINGLEO_LOGE("No input source."); + throw InvalidOperation("No input source."); + } + + for (auto &n : _nodes) { + // Set input as source of current node. + // If no dependency then this node has to receive input source + // from singleo concrete service such as Autozoom, else if dependency + // then the input of the node will be set in threadCb callback function. + if (n->getDependencies().empty()) { + if (n->getType() != NodeType::INFERENCE) { + SINGLEO_LOGE("root node should be inference node."); + throw InvalidOperation("root node should be inference node."); + } + + // TODO. consider for multiple sources later. + n->addInput(_inputs[0]); + } + + vector > &dependencies = n->getDependencies(); + + // Wait until all nodes added to this node as dependency are completed + for (auto &d : dependencies) + d->wait(); + + _threads.push_back(make_shared(&TaskManager::threadCb, this, std::ref(n))); + } + + for (auto &t : _threads) + t->join(); + + _inputs.clear(); + _threads.clear(); +} + +BaseResultType &TaskManager::output() +{ + auto service = _nodes[_nodes.size() - 1]->getInferenceService(); + + return service->result(); +} + +void TaskManager::clear() +{ + _nodes.clear(); +} + +} +} \ No newline at end of file