-project(MV_MACHINE_LEARNING_COMMON)
+project("mv_ml_common")
cmake_minimum_required(VERSION 2.6...3.13)
+pkg_check_modules(${PROJECT_NAME}_DEP REQUIRED inference-engine-interface-common)
+file(GLOB MV_ML_COMMON_SOURCE_LIST "${PROJECT_SOURCE_DIR}/src/*.cpp")
+
+add_library(${PROJECT_NAME} SHARED ${MV_ML_COMMON_SOURCE_LIST})
+
+target_link_libraries(${PROJECT_NAME} ${MV_COMMON_LIB_NAME} ${OpenCV_LIBS} ${${PROJECT_NAME}_DEP_LIBRARIES})
+target_include_directories(${PROJECT_NAME} PRIVATE include ../../common/include ../object_detection/include)
+install(TARGETS ${PROJECT_NAME} DESTINATION ${LIB_INSTALL_DIR})
install(
DIRECTORY ${PROJECT_SOURCE_DIR}/include/ DESTINATION include/media
FILES_MATCHING
--- /dev/null
+/**
+ * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ASYNC_MANAGER_H__
+#define __ASYNC_MANAGER_H__
+
+#include <queue>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <atomic>
+#include <functional>
+
+#include <mv_common.h>
+#include <mv_inference_type.h>
+
+namespace mediavision
+{
+namespace machine_learning
+{
+template<typename T> struct AsyncInputQueue {
+ unsigned long frame_number {};
+ void *handle {};
+ mv_source_h inference_src;
+ std::vector<std::vector<T> > inputs;
+ void *user_data {};
+};
+
+template<typename R> class AsyncManager
+{
+private:
+ using CallbackType = std::function<void()>;
+
+ std::queue<AsyncInputQueue<unsigned char> > _incoming_queue;
+ std::queue<R> _outgoing_queue;
+ std::mutex _incoming_queue_mutex;
+ std::mutex _outgoing_queue_mutex;
+ std::unique_ptr<std::thread> _thread_handle;
+ std::atomic<bool> _exit_thread { false };
+ std::condition_variable _outgoing_queue_event;
+ std::condition_variable _incoming_queue_event;
+ CallbackType _callback;
+
+public:
+ AsyncManager(const CallbackType &cb);
+ virtual ~AsyncManager() = default;
+
+ bool isWorking();
+ template<typename T> void pushToInput(AsyncInputQueue<T> &inputQueue);
+ template<typename T> AsyncInputQueue<T> popFromInput();
+ template<typename T> bool isInputQueueEmpty();
+ void waitforInputQueue();
+ R popFromOutput();
+ bool isOutputQueueEmpty();
+ void waitforOutputQueue();
+ void pushToOutput(R &output);
+ template<typename T> void inferenceThreadLoop();
+ template<typename T> void invoke();
+ void stop();
+};
+
+} // machine_learning
+} // mediavision
+
+#endif
\ No newline at end of file
--- /dev/null
+/**
+ * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+
+#include "mv_private.h"
+#include "machine_learning_exception.h"
+#include "async_manager.h"
+#include "object_detection_type.h"
+
+#define IS_INPUT_QUEUE_EMPTY(result_type) \
+ template bool AsyncManager<result_type>::isInputQueueEmpty<unsigned char>(); \
+ template bool AsyncManager<result_type>::isInputQueueEmpty<float>()
+#define INVOKE(result_type) \
+ template void AsyncManager<result_type>::invoke<unsigned char>(); \
+ template void AsyncManager<result_type>::invoke<float>()
+#define POP_FROM_OUTPUT(result_type) template ObjectDetectionResult AsyncManager<result_type>::popFromOutput()
+#define PUSH_TO_INPUT(result_type) \
+ template void AsyncManager<result_type>::pushToInput<unsigned char>(AsyncInputQueue<unsigned char> & inputQueue); \
+ template void AsyncManager<result_type>::pushToInput<float>(AsyncInputQueue<float> & inputQueue)
+#define STOP(result_type) template void AsyncManager<result_type>::stop()
+#define ASYNC_MANAGER(result_type) template AsyncManager<result_type>::AsyncManager(const CallbackType &cb)
+#define IS_WORKING(result_type) template bool AsyncManager<result_type>::isWorking()
+#define POP_FROM_INPUT(result_type) \
+ template AsyncInputQueue<unsigned char> AsyncManager<result_type>::popFromInput(); \
+ template AsyncInputQueue<float> AsyncManager<ObjectDetectionResult>::popFromInput()
+#define WAIT_FOR_OUTPUT_QUEUE(result_type) template void AsyncManager<result_type>::waitforOutputQueue()
+#define WAIT_FOR_INPUT_QUEUE(result_type) template void AsyncManager<result_type>::waitforInputQueue()
+#define PUSH_TO_OUTPUT(result_type) template void AsyncManager<result_type>::pushToOutput(result_type &output)
+#define INFERENCE_THREAD_LOOP(result_type) \
+ template void AsyncManager<result_type>::inferenceThreadLoop<unsigned char>(); \
+ template void AsyncManager<result_type>::inferenceThreadLoop<float>()
+
+using namespace std;
+using namespace mediavision::machine_learning::exception;
+
+namespace mediavision
+{
+namespace machine_learning
+{
+template<typename R> AsyncManager<R>::AsyncManager(const CallbackType &cb) : _callback(cb)
+{}
+
+template<typename R> bool AsyncManager<R>::isWorking()
+{
+ if (_exit_thread)
+ return false;
+
+ return true;
+}
+
+template<typename R> template<typename T> void AsyncManager<R>::pushToInput(AsyncInputQueue<T> &inputQueue)
+{
+ lock_guard<mutex> lock(_incoming_queue_mutex);
+ AsyncInputQueue<unsigned char> dstQueue;
+
+ dstQueue.frame_number = inputQueue.frame_number;
+ dstQueue.handle = inputQueue.handle;
+ dstQueue.inference_src = inputQueue.inference_src;
+ dstQueue.user_data = inputQueue.user_data;
+
+ for (auto &elms : inputQueue.inputs) {
+ vector<unsigned char> dst_vector;
+
+ for (auto &elm : elms) {
+ unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
+
+ copy_n(bytes, sizeof(T), back_inserter(dst_vector));
+ }
+
+ dstQueue.inputs.push_back(dst_vector);
+ }
+
+ _incoming_queue.push(dstQueue);
+ _incoming_queue_event.notify_one();
+}
+
+template<typename R> template<typename T> AsyncInputQueue<T> AsyncManager<R>::popFromInput()
+{
+ lock_guard<mutex> lock(_incoming_queue_mutex);
+ AsyncInputQueue<unsigned char> inputQueue = _incoming_queue.front();
+
+ _incoming_queue.pop();
+ AsyncInputQueue<T> dstQueue;
+
+ dstQueue.frame_number = inputQueue.frame_number;
+ dstQueue.handle = inputQueue.handle;
+ dstQueue.inference_src = inputQueue.inference_src;
+ dstQueue.user_data = inputQueue.user_data;
+
+ for (auto &elms : inputQueue.inputs) {
+ vector<T> dst_vector;
+
+ for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
+ T dst_data;
+
+ copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
+ dst_vector.push_back(dst_data);
+ }
+
+ dstQueue.inputs.push_back(dst_vector);
+ }
+
+ return dstQueue;
+}
+
+template<typename R> template<typename T> bool AsyncManager<R>::isInputQueueEmpty()
+{
+ lock_guard<mutex> lock(_incoming_queue_mutex);
+
+ return _incoming_queue.empty();
+}
+
+template<typename R> void AsyncManager<R>::waitforInputQueue()
+{
+ unique_lock<mutex> lock(_incoming_queue_mutex);
+
+ if (!_incoming_queue_event.wait_for(lock, 1s, [this] { return !_incoming_queue.empty(); })) {
+ LOGD("Waiting for input queue has been timed out.");
+ }
+}
+
+template<typename R> R AsyncManager<R>::popFromOutput()
+{
+ lock_guard<mutex> lock(_outgoing_queue_mutex);
+ R output = _outgoing_queue.front();
+
+ _outgoing_queue.pop();
+
+ return output;
+}
+
+template<typename R> bool AsyncManager<R>::isOutputQueueEmpty()
+{
+ lock_guard<mutex> lock(_outgoing_queue_mutex);
+
+ return _outgoing_queue.empty();
+}
+
+template<typename R> void AsyncManager<R>::waitforOutputQueue()
+{
+ unique_lock<mutex> lock(_outgoing_queue_mutex);
+
+ if (!_outgoing_queue_event.wait_for(lock, 10s, [this] {
+ if (_exit_thread)
+ throw InvalidOperation("thread is exited already.");
+
+ return !_outgoing_queue.empty();
+ })) {
+ throw InvalidOperation("Waiting for output queue has been timed out.");
+ }
+}
+
+template<typename R> void AsyncManager<R>::pushToOutput(R &output)
+{
+ lock_guard<mutex> lock(_outgoing_queue_mutex);
+
+ _outgoing_queue.push(output);
+ _outgoing_queue_event.notify_one();
+}
+
+template<typename R> template<typename T> void AsyncManager<R>::inferenceThreadLoop()
+{
+ // If user called destroy API then this thread loop will be terminated.
+ while (!_exit_thread) {
+ // Wait for input data.
+ waitforInputQueue();
+
+ // Exit this thread loop if user called destroy API while in thread loop.
+ if (_exit_thread)
+ break;
+
+ // What the callback function has to do is to,
+ // 1. Get input data from input queue.
+ // 2. Run inference.
+ // 3. Put output data to output queue.
+ _callback();
+ }
+
+ // waitforOutputQueue function could wait for the notify event after while loop is exited.
+ // So make sure to call notify_one() here.
+ _outgoing_queue_event.notify_one();
+}
+
+template<typename R> template<typename T> void AsyncManager<R>::invoke()
+{
+ if (!_thread_handle)
+ _thread_handle = make_unique<thread>(&AsyncManager::inferenceThreadLoop<T>, this);
+}
+
+template<typename R> void AsyncManager<R>::stop()
+{
+ // Make sure to wait for the completion of all inference requests in the incoming queue.
+ _exit_thread = true;
+
+ _thread_handle->join();
+ _thread_handle = nullptr;
+
+ lock_guard<mutex> lock(_outgoing_queue_mutex);
+ queue<R> empty;
+
+ swap(_outgoing_queue, empty);
+}
+
+IS_INPUT_QUEUE_EMPTY(ObjectDetectionResult);
+INVOKE(ObjectDetectionResult);
+POP_FROM_OUTPUT(ObjectDetectionResult);
+PUSH_TO_INPUT(ObjectDetectionResult);
+STOP(ObjectDetectionResult);
+ASYNC_MANAGER(ObjectDetectionResult);
+IS_WORKING(ObjectDetectionResult);
+POP_FROM_INPUT(ObjectDetectionResult);
+WAIT_FOR_OUTPUT_QUEUE(ObjectDetectionResult);
+WAIT_FOR_INPUT_QUEUE(ObjectDetectionResult);
+PUSH_TO_OUTPUT(ObjectDetectionResult);
+INFERENCE_THREAD_LOOP(ObjectDetectionResult);
+
+}
+}
\ No newline at end of file
endif()
add_library(${PROJECT_NAME} SHARED ${MV_OBJECT_DETECTION_SOURCE_LIST})
-target_link_libraries(${PROJECT_NAME} ${MV_COMMON_LIB_NAME} ${OpenCV_LIBS} ${${PROJECT_NAME}_DEP_LIBRARIES} mv_inference)
+target_link_libraries(${PROJECT_NAME} ${MV_COMMON_LIB_NAME} ${OpenCV_LIBS} ${${PROJECT_NAME}_DEP_LIBRARIES} mv_inference mv_ml_common)
target_include_directories(${PROJECT_NAME} PRIVATE include ../inference/include ../common/include ../meta/include)
install(TARGETS ${PROJECT_NAME} DESTINATION ${LIB_INSTALL_DIR})
install(
#include "ObjectDetectionParser.h"
#include "machine_learning_preprocess.h"
#include "iobject_detection.h"
+#include "async_manager.h"
namespace mediavision
{
{
private:
ObjectDetectionTaskType _task_type;
- std::queue<ObjectDetectionQueue<unsigned char> > _incoming_queue;
- std::queue<ObjectDetectionResult> _outgoing_queue;
- std::mutex _incoming_queue_mutex;
- std::mutex _outgoing_queue_mutex;
- std::unique_ptr<std::thread> _thread_handle;
- std::atomic<bool> _exit_thread { false };
+ std::unique_ptr<AsyncManager<ObjectDetectionResult> > _async_manager;
ObjectDetectionResult _current_result {};
unsigned long _input_frame_number {};
- std::condition_variable _cv_event;
void loadLabel();
void getEngineList();
void getDeviceList(const char *engine_type);
template<typename T>
void preprocess(mv_source_h &mv_src, std::shared_ptr<MetaInfo> metaInfo, std::vector<T> &inputVector);
- template<typename T> void pushToInput(ObjectDetectionQueue<T> &inputQueue);
- template<typename V> V popFromOutput();
- bool isOutputQueueEmpty();
- void waitforOutputQueue();
- template<typename T> ObjectDetectionQueue<T> popFromInput();
- template<typename T> bool isInputQueueEmpty();
- template<typename V> void pushToOutput(V &output);
- template<typename T, typename V> void inferenceThreadLoop();
std::shared_ptr<MetaInfo> getInputMetaInfo();
template<typename T> void perform(mv_source_h &mv_src, std::shared_ptr<MetaInfo> metaInfo);
template<typename T> void performAsync(ObjectDetectionInput &input, std::shared_ptr<MetaInfo> metaInfo);
void performAsync(ObjectDetectionInput &input) override;
ObjectDetectionResult &getOutput() override;
ObjectDetectionResult &getOutputCache() override;
+ template<typename T, typename R> void inferenceCallback();
};
} // machine_learning
#ifndef __OBJECT_DETECTION_TYPE_H__
#define __OBJECT_DETECTION_TYPE_H__
+#include <opencv2/core.hpp>
+
#include <mv_common.h>
#include <mv_inference_type.h>
#include <mv_object_detection_type.h>
// TODO.
};
-template<typename T> struct ObjectDetectionQueue {
- unsigned long frame_number {};
- void *handle {};
- mv_source_h inference_src;
- std::vector<std::vector<T> > inputs;
- void *user_data {};
-};
-
/**
* @brief The object detection result structure.
* @details Contains object detection result.
void ObjectDetection::preDestroy()
{
- if (!_thread_handle)
+ if (!_async_manager)
return;
- // Make sure to wait for the completion of all inference requests in the incoming queue.
- _exit_thread = true;
-
- _thread_handle->join();
- _thread_handle = nullptr;
-
- lock_guard<mutex> lock(_outgoing_queue_mutex);
- queue<ObjectDetectionResult> empty;
-
- swap(_outgoing_queue, empty);
+ _async_manager->stop();
}
ObjectDetectionTaskType ObjectDetection::getTaskType()
throw InvalidOperation("Invalid model data type.");
}
-template<typename T, typename V> void ObjectDetection::inferenceThreadLoop()
+template<typename T, typename R> void ObjectDetection::inferenceCallback()
{
- // If user called destroy API then this thread loop will be terminated.
- while (!_exit_thread) {
- // If input queue is empty then skip inference request.
- if (isInputQueueEmpty<T>())
- continue;
-
- ObjectDetectionQueue<T> input = popFromInput<T>();
+ AsyncInputQueue<T> inputQueue = _async_manager->popFromInput<T>();
- LOGD("Popped : input frame number = %lu", input.frame_number);
+ inference<T>(inputQueue.inputs);
- inference<T>(input.inputs);
+ R &resultQueue = result();
- V &resultQueue = result();
- resultQueue.frame_number = input.frame_number;
+ resultQueue.frame_number = inputQueue.frame_number;
- pushToOutput<V>(resultQueue);
- }
-
- // waitforOutputQueue function could wait for the notify event after while loop is exited.
- // So make sure to call notify_one() here.
- _cv_event.notify_one();
+ _async_manager->pushToOutput(resultQueue);
}
template<typename T> void ObjectDetection::performAsync(ObjectDetectionInput &input, shared_ptr<MetaInfo> metaInfo)
{
_input_frame_number++;
- if (!isInputQueueEmpty<T>())
+ if (!_async_manager) {
+ _async_manager = make_unique<AsyncManager<ObjectDetectionResult> >(
+ [this]() { inferenceCallback<T, ObjectDetectionResult>(); });
+ }
+
+ if (!_async_manager->isInputQueueEmpty<T>()) {
+ LOGD("input frame number(%ld) has been skipped.", _input_frame_number);
return;
+ }
vector<T> inputVector;
preprocess<T>(input.inference_src, metaInfo, inputVector);
vector<vector<T> > inputVectors = { inputVector };
- ObjectDetectionQueue<T> in_queue = { _input_frame_number, input.handle, input.inference_src, inputVectors };
+ AsyncInputQueue<T> in_queue = { _input_frame_number, input.handle, input.inference_src, inputVectors };
+
+ _async_manager->pushToInput<T>(in_queue);
- pushToInput<T>(in_queue);
LOGD("Pushed : input frame number = %lu", in_queue.frame_number);
- if (!_thread_handle)
- _thread_handle = make_unique<thread>(&ObjectDetection::inferenceThreadLoop<T, ObjectDetectionResult>, this);
+ _async_manager->invoke<T>();
}
void ObjectDetection::performAsync(ObjectDetectionInput &input)
ObjectDetectionResult &ObjectDetection::getOutput()
{
- if (_thread_handle) {
- if (_exit_thread)
- throw InvalidOperation("Object detection is already destroyed so invalid operation.");
+ if (_async_manager) {
+ if (!_async_manager->isWorking())
+ throw InvalidOperation("Object detection has been already destroyed so invalid operation.");
- waitforOutputQueue();
- _current_result = popFromOutput<ObjectDetectionResult>();
+ _async_manager->waitforOutputQueue();
+ _current_result = _async_manager->popFromOutput();
} else {
// TODO. Check if inference request is completed or not here.
// If not then throw an exception.
copy(&raw_buffer[0], &raw_buffer[tensor_buffer->size / sizeof(float)], back_inserter(tensor));
}
-template<typename T> void ObjectDetection::pushToInput(ObjectDetectionQueue<T> &inputQueue)
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
- ObjectDetectionQueue<unsigned char> dstQueue;
-
- dstQueue.frame_number = inputQueue.frame_number;
- dstQueue.handle = inputQueue.handle;
- dstQueue.inference_src = inputQueue.inference_src;
- dstQueue.user_data = inputQueue.user_data;
-
- for (auto &elms : inputQueue.inputs) {
- vector<unsigned char> dst_vector;
-
- for (auto &elm : elms) {
- unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
-
- copy_n(bytes, sizeof(T), back_inserter(dst_vector));
- }
-
- dstQueue.inputs.push_back(dst_vector);
- }
-
- _incoming_queue.push(dstQueue);
-}
-
-template<typename T> ObjectDetectionQueue<T> ObjectDetection::popFromInput()
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
- ObjectDetectionQueue<unsigned char> inputQueue = _incoming_queue.front();
-
- _incoming_queue.pop();
- ObjectDetectionQueue<T> dstQueue;
-
- dstQueue.frame_number = inputQueue.frame_number;
- dstQueue.handle = inputQueue.handle;
- dstQueue.inference_src = inputQueue.inference_src;
- dstQueue.user_data = inputQueue.user_data;
-
- for (auto &elms : inputQueue.inputs) {
- vector<T> dst_vector;
-
- for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
- T dst_data;
-
- copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
- dst_vector.push_back(dst_data);
- }
-
- dstQueue.inputs.push_back(dst_vector);
- }
-
- return dstQueue;
-}
-
-template<typename T> bool ObjectDetection::isInputQueueEmpty()
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
-
- return _incoming_queue.empty();
-}
-
-template<typename V> void ObjectDetection::pushToOutput(V &output)
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
-
- _outgoing_queue.push(output);
- _cv_event.notify_one();
-}
-
-template<typename V> V ObjectDetection::popFromOutput()
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
- V output = _outgoing_queue.front();
-
- _outgoing_queue.pop();
-
- return output;
-}
-
-bool ObjectDetection::isOutputQueueEmpty()
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
- return _outgoing_queue.empty();
-}
-
-void ObjectDetection::waitforOutputQueue()
-{
- unique_lock<mutex> lock(_outgoing_queue_mutex);
-
- if (!_cv_event.wait_for(lock, 10s, [this] {
- if (_exit_thread)
- throw InvalidOperation("already thread exit");
-
- return !_outgoing_queue.empty();
- })) {
- throw InvalidOperation("Waiting for output queue has been timed out.");
- }
-}
-
template void ObjectDetection::preprocess<float>(mv_source_h &mv_src, shared_ptr<MetaInfo> metaInfo,
vector<float> &inputVector);
template void ObjectDetection::inference<float>(vector<vector<float> > &inputVectors);
template void ObjectDetection::perform<float>(mv_source_h &mv_src, shared_ptr<MetaInfo> metaInfo);
-template void ObjectDetection::pushToInput<float>(ObjectDetectionQueue<float> &inputQueue);
-template ObjectDetectionQueue<float> ObjectDetection::popFromInput();
-template bool ObjectDetection::isInputQueueEmpty<float>();
template void ObjectDetection::performAsync<float>(ObjectDetectionInput &input, shared_ptr<MetaInfo> metaInfo);
template void ObjectDetection::preprocess<unsigned char>(mv_source_h &mv_src, shared_ptr<MetaInfo> metaInfo,
vector<unsigned char> &inputVector);
template void ObjectDetection::inference<unsigned char>(vector<vector<unsigned char> > &inputVectors);
template void ObjectDetection::perform<unsigned char>(mv_source_h &mv_src, shared_ptr<MetaInfo> metaInfo);
-template void ObjectDetection::pushToInput<unsigned char>(ObjectDetectionQueue<unsigned char> &inputQueue);
-template ObjectDetectionQueue<unsigned char> ObjectDetection::popFromInput();
-template bool ObjectDetection::isInputQueueEmpty<unsigned char>();
template void ObjectDetection::performAsync<unsigned char>(ObjectDetectionInput &input, shared_ptr<MetaInfo> metaInfo);
}
%manifest %{name}.manifest
%license LICENSE.APLv2
%{_libdir}/libmv_inference*.so
+%{_libdir}/libmv_ml_common.so
%if "%{enable_ml_face_recognition}" == "1"
%{_datadir}/%{name}/face_recognition.json
%{_libdir}/libmv_training.so