2 * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include "mv_private.h"
20 #include "machine_learning_exception.h"
21 #include "async_manager.h"
22 #include "object_detection_type.h"
24 #define IS_INPUT_QUEUE_EMPTY(result_type) \
25 template bool AsyncManager<result_type>::isInputQueueEmpty<unsigned char>(); \
26 template bool AsyncManager<result_type>::isInputQueueEmpty<float>()
27 #define INVOKE(result_type) \
28 template void AsyncManager<result_type>::invoke<unsigned char>(); \
29 template void AsyncManager<result_type>::invoke<float>()
30 #define POP_FROM_OUTPUT(result_type) template result_type AsyncManager<result_type>::popFromOutput()
31 #define PUSH_TO_INPUT(result_type) \
32 template void AsyncManager<result_type>::pushToInput<unsigned char>(AsyncInputQueue<unsigned char> & inputQueue); \
33 template void AsyncManager<result_type>::pushToInput<float>(AsyncInputQueue<float> & inputQueue)
34 #define STOP(result_type) template void AsyncManager<result_type>::stop()
35 #define ASYNC_MANAGER(result_type) template AsyncManager<result_type>::AsyncManager(const CallbackType &cb)
36 #define IS_WORKING(result_type) template bool AsyncManager<result_type>::isWorking()
37 #define POP_FROM_INPUT(result_type) \
38 template AsyncInputQueue<unsigned char> AsyncManager<result_type>::popFromInput(); \
39 template AsyncInputQueue<float> AsyncManager<result_type>::popFromInput()
40 #define PUSH_TO_OUTPUT(result_type) template void AsyncManager<result_type>::pushToOutput(result_type &output)
41 #define INFERENCE_THREAD_LOOP(result_type) \
42 template void AsyncManager<result_type>::inferenceThreadLoop<unsigned char>(); \
43 template void AsyncManager<result_type>::inferenceThreadLoop<float>()
44 #define PUSH(result_type) \
45 template void AsyncManager<result_type>::push(std::vector<std::vector<unsigned char> > &inputs); \
46 template void AsyncManager<result_type>::push(std::vector<std::vector<float> > &inputs)
47 #define POP(result_type) template result_type AsyncManager<result_type>::pop()
50 using namespace mediavision::machine_learning::exception;
54 namespace machine_learning
56 template<typename R> AsyncManager<R>::AsyncManager(const CallbackType &cb) : _callback(cb)
59 template<typename R> bool AsyncManager<R>::isWorking()
67 template<typename R> template<typename T> void AsyncManager<R>::pushToInput(AsyncInputQueue<T> &inputQueue)
69 lock_guard<mutex> lock(_incoming_queue_mutex);
70 AsyncInputQueue<unsigned char> dstQueue;
72 dstQueue.frame_number = inputQueue.frame_number;
74 for (auto &elms : inputQueue.inputs) {
75 vector<unsigned char> dst_vector;
77 for (auto &elm : elms) {
78 unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
80 copy_n(bytes, sizeof(T), back_inserter(dst_vector));
83 dstQueue.inputs.push_back(dst_vector);
86 _incoming_queue.push(dstQueue);
87 _incoming_queue_event.notify_one();
90 template<typename R> template<typename T> AsyncInputQueue<T> AsyncManager<R>::popFromInput()
92 lock_guard<mutex> lock(_incoming_queue_mutex);
93 AsyncInputQueue<unsigned char> inputQueue = _incoming_queue.front();
95 _incoming_queue.pop();
96 AsyncInputQueue<T> dstQueue;
98 dstQueue.frame_number = inputQueue.frame_number;
100 for (auto &elms : inputQueue.inputs) {
101 vector<T> dst_vector;
103 for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
106 copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
107 dst_vector.push_back(dst_data);
110 dstQueue.inputs.push_back(dst_vector);
116 template<typename R> template<typename T> bool AsyncManager<R>::isInputQueueEmpty()
118 lock_guard<mutex> lock(_incoming_queue_mutex);
120 return _incoming_queue.empty();
123 template<typename R> void AsyncManager<R>::waitforInputQueue()
125 unique_lock<mutex> lock(_incoming_queue_mutex);
127 if (!_incoming_queue_event.wait_for(lock, 1s, [this] { return !_incoming_queue.empty(); })) {
128 LOGD("Waiting for input queue has been timed out.");
132 template<typename R> R AsyncManager<R>::popFromOutput()
134 lock_guard<mutex> lock(_outgoing_queue_mutex);
135 R output = _outgoing_queue.front();
137 _outgoing_queue.pop();
142 template<typename R> bool AsyncManager<R>::isOutputQueueEmpty()
144 lock_guard<mutex> lock(_outgoing_queue_mutex);
146 return _outgoing_queue.empty();
149 template<typename R> void AsyncManager<R>::waitforOutputQueue()
151 unique_lock<mutex> lock(_outgoing_queue_mutex);
153 if (!_outgoing_queue_event.wait_for(lock, 10s, [this] {
155 throw InvalidOperation("thread is exited already.");
157 return !_outgoing_queue.empty();
159 throw InvalidOperation("Waiting for output queue has been timed out.");
163 template<typename R> void AsyncManager<R>::pushToOutput(R &output)
165 lock_guard<mutex> lock(_outgoing_queue_mutex);
167 _outgoing_queue.push(output);
168 _outgoing_queue_event.notify_one();
171 template<typename R> template<typename T> void AsyncManager<R>::inferenceThreadLoop()
173 // If user called destroy API then this thread loop will be terminated.
174 while (!_exit_thread) {
175 // Wait for input data.
178 // Exit this thread loop if user called destroy API while in thread loop.
182 // What the callback function has to do is to,
183 // 1. Get input data from input queue.
185 // 3. Put output data to output queue.
189 // waitforOutputQueue function could wait for the notify event after while loop is exited.
190 // So make sure to call notify_one() here.
191 _outgoing_queue_event.notify_one();
194 template<typename R> template<typename T> void AsyncManager<R>::invoke()
197 _thread_handle = make_unique<thread>(&AsyncManager::inferenceThreadLoop<T>, this);
200 template<typename R> void AsyncManager<R>::stop()
202 // Make sure to wait for the completion of all inference requests in the incoming queue.
205 _thread_handle->join();
206 _thread_handle = nullptr;
208 lock_guard<mutex> lock(_outgoing_queue_mutex);
211 swap(_outgoing_queue, empty);
214 template<typename R> template<typename T> void AsyncManager<R>::push(std::vector<std::vector<T> > &inputs)
216 _input_frame_number++;
218 if (!isInputQueueEmpty<T>()) {
219 LOGD("input frame number(%ld) has been skipped.", _input_frame_number);
223 AsyncInputQueue<T> in_queue = { _input_frame_number, inputs };
225 pushToInput<T>(in_queue);
227 LOGD("Pushed : input frame number = %lu", in_queue.frame_number);
232 template<typename R> R AsyncManager<R>::pop()
234 waitforOutputQueue();
236 return popFromOutput();
239 IS_INPUT_QUEUE_EMPTY(ObjectDetectionResult);
240 INVOKE(ObjectDetectionResult);
241 POP_FROM_OUTPUT(ObjectDetectionResult);
242 PUSH_TO_INPUT(ObjectDetectionResult);
243 STOP(ObjectDetectionResult);
244 ASYNC_MANAGER(ObjectDetectionResult);
245 IS_WORKING(ObjectDetectionResult);
246 POP_FROM_INPUT(ObjectDetectionResult);
247 PUSH_TO_OUTPUT(ObjectDetectionResult);
248 INFERENCE_THREAD_LOOP(ObjectDetectionResult);
249 PUSH(ObjectDetectionResult);
250 POP(ObjectDetectionResult);