#include <condition_variable>
#include <atomic>
#include <functional>
+#include <algorithm>
+#include <chrono>
#include <mv_common.h>
#include <mv_inference_type.h>
+#include "machine_learning_exception.h"
namespace mediavision
{
unsigned long _input_frame_number {};
CallbackType _callback;
- template<typename T> void pushToInput(AsyncInputQueue<T> &inputQueue);
- template<typename T> bool isInputQueueEmpty();
- void waitforInputQueue();
- R popFromOutput();
- bool isOutputQueueEmpty();
- void waitforOutputQueue();
- template<typename T> void inferenceThreadLoop();
- template<typename T> void invoke();
+ template<typename T> void pushToInput(AsyncInputQueue<T> &inputQueue)
+ {
+ std::lock_guard<std::mutex> lock(_incoming_queue_mutex);
+ AsyncInputQueue<unsigned char> dstQueue;
+
+ dstQueue.frame_number = inputQueue.frame_number;
+
+ for (auto &elms : inputQueue.inputs) {
+ std::vector<unsigned char> dst_vector;
+
+ for (auto &elm : elms) {
+ unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
+
+ std::copy_n(bytes, sizeof(T), back_inserter(dst_vector));
+ }
+
+ dstQueue.inputs.push_back(dst_vector);
+ }
+
+ _incoming_queue.push(dstQueue);
+ _incoming_queue_event.notify_one();
+ }
+ template<typename T> bool isInputQueueEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_incoming_queue_mutex);
+
+ return _incoming_queue.empty();
+ }
+ R popFromOutput()
+ {
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+ R output = _outgoing_queue.front();
+
+ _outgoing_queue.pop();
+
+ return output;
+ }
+ template<typename T> void inferenceThreadLoop()
+ {
+ // If user called destroy API then this thread loop will be terminated.
+ while (!_exit_thread) {
+ // Wait for input data.
+ waitforInputQueue();
+
+ // Exit this thread loop if user called destroy API while in thread loop.
+ if (_exit_thread)
+ break;
+
+ // What the callback function has to do is to,
+ // 1. Get input data from input queue.
+ // 2. Run inference.
+ // 3. Put output data to output queue.
+ _callback();
+ }
+
+ // waitforOutputQueue function could wait for the notify event after while loop is exited.
+ // So make sure to call notify_one() here.
+ _outgoing_queue_event.notify_one();
+ }
+ template<typename T> void invoke()
+ {
+ if (!_thread_handle)
+ _thread_handle = std::make_unique<std::thread>(&AsyncManager::inferenceThreadLoop<T>, this);
+ }
+ void waitforInputQueue()
+ {
+ std::unique_lock<std::mutex> lock(_incoming_queue_mutex);
+
+ if (!_incoming_queue_event.wait_for(lock, std::chrono::seconds(1),
+ [this] { return !_incoming_queue.empty(); })) {
+ LOGD("Waiting for input queue has been timed out.");
+ }
+ }
+ bool isOutputQueueEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+
+ return _outgoing_queue.empty();
+ }
+ void waitforOutputQueue()
+ {
+ std::unique_lock<std::mutex> lock(_outgoing_queue_mutex);
+
+ if (!_outgoing_queue_event.wait_for(lock, std::chrono::seconds(10), [this] {
+ if (_exit_thread)
+ throw exception::InvalidOperation("thread is exited already.");
+
+ return !_outgoing_queue.empty();
+ })) {
+ throw exception::InvalidOperation("Waiting for output queue has been timed out.");
+ }
+ }
public:
- AsyncManager(const CallbackType &cb);
+ AsyncManager(const CallbackType &cb) : _callback(cb)
+ {}
virtual ~AsyncManager() = default;
- bool isWorking();
- void stop();
- template<typename T> AsyncInputQueue<T> popFromInput();
- void pushToOutput(R &output);
- template<typename T> void push(std::vector<std::vector<T> > &inputs);
- R pop();
+ bool isWorking()
+ {
+ if (_exit_thread)
+ return false;
+
+ return true;
+ }
+ void stop()
+ {
+ // Make sure to wait for the completion of all inference requests in the incoming queue.
+ _exit_thread = true;
+
+ _thread_handle->join();
+ _thread_handle = nullptr;
+
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+ std::queue<R> empty;
+
+ swap(_outgoing_queue, empty);
+ }
+ template<typename T> AsyncInputQueue<T> popFromInput()
+ {
+ std::lock_guard<std::mutex> lock(_incoming_queue_mutex);
+ AsyncInputQueue<unsigned char> inputQueue = _incoming_queue.front();
+
+ _incoming_queue.pop();
+ AsyncInputQueue<T> dstQueue;
+
+ dstQueue.frame_number = inputQueue.frame_number;
+
+ for (auto &elms : inputQueue.inputs) {
+ std::vector<T> dst_vector;
+
+ for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
+ T dst_data;
+
+ std::copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
+ dst_vector.push_back(dst_data);
+ }
+
+ dstQueue.inputs.push_back(dst_vector);
+ }
+
+ return dstQueue;
+ }
+ void pushToOutput(R &output)
+ {
+ std::lock_guard<std::mutex> lock(_outgoing_queue_mutex);
+
+ _outgoing_queue.push(output);
+ _outgoing_queue_event.notify_one();
+ }
+ template<typename T> void push(std::vector<std::vector<T> > &inputs)
+ {
+ _input_frame_number++;
+
+ if (!isInputQueueEmpty<T>()) {
+ LOGD("input frame number(%ld) has been skipped.", _input_frame_number);
+ return;
+ }
+
+ AsyncInputQueue<T> in_queue = { _input_frame_number, inputs };
+
+ pushToInput<T>(in_queue);
+
+ LOGD("Pushed : input frame number = %lu", in_queue.frame_number);
+
+ invoke<T>();
+ }
+ R pop()
+ {
+ waitforOutputQueue();
+
+ return popFromOutput();
+ }
};
} // machine_learning
+++ /dev/null
-/**
- * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <algorithm>
-
-#include "mv_private.h"
-#include "machine_learning_exception.h"
-#include "async_manager.h"
-#include "object_detection_type.h"
-
-#define IS_INPUT_QUEUE_EMPTY(result_type) \
- template bool AsyncManager<result_type>::isInputQueueEmpty<unsigned char>(); \
- template bool AsyncManager<result_type>::isInputQueueEmpty<float>()
-#define INVOKE(result_type) \
- template void AsyncManager<result_type>::invoke<unsigned char>(); \
- template void AsyncManager<result_type>::invoke<float>()
-#define POP_FROM_OUTPUT(result_type) template result_type AsyncManager<result_type>::popFromOutput()
-#define PUSH_TO_INPUT(result_type) \
- template void AsyncManager<result_type>::pushToInput<unsigned char>(AsyncInputQueue<unsigned char> & inputQueue); \
- template void AsyncManager<result_type>::pushToInput<float>(AsyncInputQueue<float> & inputQueue)
-#define STOP(result_type) template void AsyncManager<result_type>::stop()
-#define ASYNC_MANAGER(result_type) template AsyncManager<result_type>::AsyncManager(const CallbackType &cb)
-#define IS_WORKING(result_type) template bool AsyncManager<result_type>::isWorking()
-#define POP_FROM_INPUT(result_type) \
- template AsyncInputQueue<unsigned char> AsyncManager<result_type>::popFromInput(); \
- template AsyncInputQueue<float> AsyncManager<result_type>::popFromInput()
-#define PUSH_TO_OUTPUT(result_type) template void AsyncManager<result_type>::pushToOutput(result_type &output)
-#define INFERENCE_THREAD_LOOP(result_type) \
- template void AsyncManager<result_type>::inferenceThreadLoop<unsigned char>(); \
- template void AsyncManager<result_type>::inferenceThreadLoop<float>()
-#define PUSH(result_type) \
- template void AsyncManager<result_type>::push(std::vector<std::vector<unsigned char> > &inputs); \
- template void AsyncManager<result_type>::push(std::vector<std::vector<float> > &inputs)
-#define POP(result_type) template result_type AsyncManager<result_type>::pop()
-
-using namespace std;
-using namespace mediavision::machine_learning::exception;
-
-namespace mediavision
-{
-namespace machine_learning
-{
-template<typename R> AsyncManager<R>::AsyncManager(const CallbackType &cb) : _callback(cb)
-{}
-
-template<typename R> bool AsyncManager<R>::isWorking()
-{
- if (_exit_thread)
- return false;
-
- return true;
-}
-
-template<typename R> template<typename T> void AsyncManager<R>::pushToInput(AsyncInputQueue<T> &inputQueue)
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
- AsyncInputQueue<unsigned char> dstQueue;
-
- dstQueue.frame_number = inputQueue.frame_number;
-
- for (auto &elms : inputQueue.inputs) {
- vector<unsigned char> dst_vector;
-
- for (auto &elm : elms) {
- unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
-
- copy_n(bytes, sizeof(T), back_inserter(dst_vector));
- }
-
- dstQueue.inputs.push_back(dst_vector);
- }
-
- _incoming_queue.push(dstQueue);
- _incoming_queue_event.notify_one();
-}
-
-template<typename R> template<typename T> AsyncInputQueue<T> AsyncManager<R>::popFromInput()
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
- AsyncInputQueue<unsigned char> inputQueue = _incoming_queue.front();
-
- _incoming_queue.pop();
- AsyncInputQueue<T> dstQueue;
-
- dstQueue.frame_number = inputQueue.frame_number;
-
- for (auto &elms : inputQueue.inputs) {
- vector<T> dst_vector;
-
- for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
- T dst_data;
-
- copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
- dst_vector.push_back(dst_data);
- }
-
- dstQueue.inputs.push_back(dst_vector);
- }
-
- return dstQueue;
-}
-
-template<typename R> template<typename T> bool AsyncManager<R>::isInputQueueEmpty()
-{
- lock_guard<mutex> lock(_incoming_queue_mutex);
-
- return _incoming_queue.empty();
-}
-
-template<typename R> void AsyncManager<R>::waitforInputQueue()
-{
- unique_lock<mutex> lock(_incoming_queue_mutex);
-
- if (!_incoming_queue_event.wait_for(lock, 1s, [this] { return !_incoming_queue.empty(); })) {
- LOGD("Waiting for input queue has been timed out.");
- }
-}
-
-template<typename R> R AsyncManager<R>::popFromOutput()
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
- R output = _outgoing_queue.front();
-
- _outgoing_queue.pop();
-
- return output;
-}
-
-template<typename R> bool AsyncManager<R>::isOutputQueueEmpty()
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
-
- return _outgoing_queue.empty();
-}
-
-template<typename R> void AsyncManager<R>::waitforOutputQueue()
-{
- unique_lock<mutex> lock(_outgoing_queue_mutex);
-
- if (!_outgoing_queue_event.wait_for(lock, 10s, [this] {
- if (_exit_thread)
- throw InvalidOperation("thread is exited already.");
-
- return !_outgoing_queue.empty();
- })) {
- throw InvalidOperation("Waiting for output queue has been timed out.");
- }
-}
-
-template<typename R> void AsyncManager<R>::pushToOutput(R &output)
-{
- lock_guard<mutex> lock(_outgoing_queue_mutex);
-
- _outgoing_queue.push(output);
- _outgoing_queue_event.notify_one();
-}
-
-template<typename R> template<typename T> void AsyncManager<R>::inferenceThreadLoop()
-{
- // If user called destroy API then this thread loop will be terminated.
- while (!_exit_thread) {
- // Wait for input data.
- waitforInputQueue();
-
- // Exit this thread loop if user called destroy API while in thread loop.
- if (_exit_thread)
- break;
-
- // What the callback function has to do is to,
- // 1. Get input data from input queue.
- // 2. Run inference.
- // 3. Put output data to output queue.
- _callback();
- }
-
- // waitforOutputQueue function could wait for the notify event after while loop is exited.
- // So make sure to call notify_one() here.
- _outgoing_queue_event.notify_one();
-}
-
-template<typename R> template<typename T> void AsyncManager<R>::invoke()
-{
- if (!_thread_handle)
- _thread_handle = make_unique<thread>(&AsyncManager::inferenceThreadLoop<T>, this);
-}
-
-template<typename R> void AsyncManager<R>::stop()
-{
- // Make sure to wait for the completion of all inference requests in the incoming queue.
- _exit_thread = true;
-
- _thread_handle->join();
- _thread_handle = nullptr;
-
- lock_guard<mutex> lock(_outgoing_queue_mutex);
- queue<R> empty;
-
- swap(_outgoing_queue, empty);
-}
-
-template<typename R> template<typename T> void AsyncManager<R>::push(std::vector<std::vector<T> > &inputs)
-{
- _input_frame_number++;
-
- if (!isInputQueueEmpty<T>()) {
- LOGD("input frame number(%ld) has been skipped.", _input_frame_number);
- return;
- }
-
- AsyncInputQueue<T> in_queue = { _input_frame_number, inputs };
-
- pushToInput<T>(in_queue);
-
- LOGD("Pushed : input frame number = %lu", in_queue.frame_number);
-
- invoke<T>();
-}
-
-template<typename R> R AsyncManager<R>::pop()
-{
- waitforOutputQueue();
-
- return popFromOutput();
-}
-
-IS_INPUT_QUEUE_EMPTY(ObjectDetectionResult);
-INVOKE(ObjectDetectionResult);
-POP_FROM_OUTPUT(ObjectDetectionResult);
-PUSH_TO_INPUT(ObjectDetectionResult);
-STOP(ObjectDetectionResult);
-ASYNC_MANAGER(ObjectDetectionResult);
-IS_WORKING(ObjectDetectionResult);
-POP_FROM_INPUT(ObjectDetectionResult);
-PUSH_TO_OUTPUT(ObjectDetectionResult);
-INFERENCE_THREAD_LOOP(ObjectDetectionResult);
-PUSH(ObjectDetectionResult);
-POP(ObjectDetectionResult);
-
-}
-}
\ No newline at end of file