mv_machine_learning: code refactoring to AsyncManager
[platform/core/api/mediavision.git] / mv_machine_learning / common / src / async_manager.cpp
1 /**
2  * Copyright (c) 2023 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <algorithm>
18
19 #include "mv_private.h"
20 #include "machine_learning_exception.h"
21 #include "async_manager.h"
22 #include "object_detection_type.h"
23
24 #define IS_INPUT_QUEUE_EMPTY(result_type)                                        \
25         template bool AsyncManager<result_type>::isInputQueueEmpty<unsigned char>(); \
26         template bool AsyncManager<result_type>::isInputQueueEmpty<float>()
27 #define INVOKE(result_type)                                           \
28         template void AsyncManager<result_type>::invoke<unsigned char>(); \
29         template void AsyncManager<result_type>::invoke<float>()
30 #define POP_FROM_OUTPUT(result_type) template result_type AsyncManager<result_type>::popFromOutput()
31 #define PUSH_TO_INPUT(result_type)                                                                                    \
32         template void AsyncManager<result_type>::pushToInput<unsigned char>(AsyncInputQueue<unsigned char> & inputQueue); \
33         template void AsyncManager<result_type>::pushToInput<float>(AsyncInputQueue<float> & inputQueue)
34 #define STOP(result_type) template void AsyncManager<result_type>::stop()
35 #define ASYNC_MANAGER(result_type) template AsyncManager<result_type>::AsyncManager(const CallbackType &cb)
36 #define IS_WORKING(result_type) template bool AsyncManager<result_type>::isWorking()
37 #define POP_FROM_INPUT(result_type)                                                    \
38         template AsyncInputQueue<unsigned char> AsyncManager<result_type>::popFromInput(); \
39         template AsyncInputQueue<float> AsyncManager<result_type>::popFromInput()
40 #define PUSH_TO_OUTPUT(result_type) template void AsyncManager<result_type>::pushToOutput(result_type &output)
41 #define INFERENCE_THREAD_LOOP(result_type)                                         \
42         template void AsyncManager<result_type>::inferenceThreadLoop<unsigned char>(); \
43         template void AsyncManager<result_type>::inferenceThreadLoop<float>()
44 #define PUSH(result_type)                                                                            \
45         template void AsyncManager<result_type>::push(std::vector<std::vector<unsigned char> > &inputs); \
46         template void AsyncManager<result_type>::push(std::vector<std::vector<float> > &inputs)
47 #define POP(result_type) template result_type AsyncManager<result_type>::pop()
48
49 using namespace std;
50 using namespace mediavision::machine_learning::exception;
51
52 namespace mediavision
53 {
54 namespace machine_learning
55 {
56 template<typename R> AsyncManager<R>::AsyncManager(const CallbackType &cb) : _callback(cb)
57 {}
58
59 template<typename R> bool AsyncManager<R>::isWorking()
60 {
61         if (_exit_thread)
62                 return false;
63
64         return true;
65 }
66
67 template<typename R> template<typename T> void AsyncManager<R>::pushToInput(AsyncInputQueue<T> &inputQueue)
68 {
69         lock_guard<mutex> lock(_incoming_queue_mutex);
70         AsyncInputQueue<unsigned char> dstQueue;
71
72         dstQueue.frame_number = inputQueue.frame_number;
73
74         for (auto &elms : inputQueue.inputs) {
75                 vector<unsigned char> dst_vector;
76
77                 for (auto &elm : elms) {
78                         unsigned char *bytes = reinterpret_cast<unsigned char *>(&elm);
79
80                         copy_n(bytes, sizeof(T), back_inserter(dst_vector));
81                 }
82
83                 dstQueue.inputs.push_back(dst_vector);
84         }
85
86         _incoming_queue.push(dstQueue);
87         _incoming_queue_event.notify_one();
88 }
89
90 template<typename R> template<typename T> AsyncInputQueue<T> AsyncManager<R>::popFromInput()
91 {
92         lock_guard<mutex> lock(_incoming_queue_mutex);
93         AsyncInputQueue<unsigned char> inputQueue = _incoming_queue.front();
94
95         _incoming_queue.pop();
96         AsyncInputQueue<T> dstQueue;
97
98         dstQueue.frame_number = inputQueue.frame_number;
99
100         for (auto &elms : inputQueue.inputs) {
101                 vector<T> dst_vector;
102
103                 for (size_t idx = 0; idx < elms.size(); idx += sizeof(T)) {
104                         T dst_data;
105
106                         copy_n(elms.begin() + idx, sizeof(T), reinterpret_cast<unsigned char *>(&dst_data));
107                         dst_vector.push_back(dst_data);
108                 }
109
110                 dstQueue.inputs.push_back(dst_vector);
111         }
112
113         return dstQueue;
114 }
115
116 template<typename R> template<typename T> bool AsyncManager<R>::isInputQueueEmpty()
117 {
118         lock_guard<mutex> lock(_incoming_queue_mutex);
119
120         return _incoming_queue.empty();
121 }
122
123 template<typename R> void AsyncManager<R>::waitforInputQueue()
124 {
125         unique_lock<mutex> lock(_incoming_queue_mutex);
126
127         if (!_incoming_queue_event.wait_for(lock, 1s, [this] { return !_incoming_queue.empty(); })) {
128                 LOGD("Waiting for input queue has been timed out.");
129         }
130 }
131
132 template<typename R> R AsyncManager<R>::popFromOutput()
133 {
134         lock_guard<mutex> lock(_outgoing_queue_mutex);
135         R output = _outgoing_queue.front();
136
137         _outgoing_queue.pop();
138
139         return output;
140 }
141
142 template<typename R> bool AsyncManager<R>::isOutputQueueEmpty()
143 {
144         lock_guard<mutex> lock(_outgoing_queue_mutex);
145
146         return _outgoing_queue.empty();
147 }
148
149 template<typename R> void AsyncManager<R>::waitforOutputQueue()
150 {
151         unique_lock<mutex> lock(_outgoing_queue_mutex);
152
153         if (!_outgoing_queue_event.wait_for(lock, 10s, [this] {
154                         if (_exit_thread)
155                                 throw InvalidOperation("thread is exited already.");
156
157                         return !_outgoing_queue.empty();
158                 })) {
159                 throw InvalidOperation("Waiting for output queue has been timed out.");
160         }
161 }
162
163 template<typename R> void AsyncManager<R>::pushToOutput(R &output)
164 {
165         lock_guard<mutex> lock(_outgoing_queue_mutex);
166
167         _outgoing_queue.push(output);
168         _outgoing_queue_event.notify_one();
169 }
170
171 template<typename R> template<typename T> void AsyncManager<R>::inferenceThreadLoop()
172 {
173         // If user called destroy API then this thread loop will be terminated.
174         while (!_exit_thread) {
175                 // Wait for input data.
176                 waitforInputQueue();
177
178                 // Exit this thread loop if user called destroy API while in thread loop.
179                 if (_exit_thread)
180                         break;
181
182                 // What the callback function has to do is to,
183                 // 1. Get input data from input queue.
184                 // 2. Run inference.
185                 // 3. Put output data to output queue.
186                 _callback();
187         }
188
189         // waitforOutputQueue function could wait for the notify event after while loop is exited.
190         // So make sure to call notify_one() here.
191         _outgoing_queue_event.notify_one();
192 }
193
194 template<typename R> template<typename T> void AsyncManager<R>::invoke()
195 {
196         if (!_thread_handle)
197                 _thread_handle = make_unique<thread>(&AsyncManager::inferenceThreadLoop<T>, this);
198 }
199
200 template<typename R> void AsyncManager<R>::stop()
201 {
202         // Make sure to wait for the completion of all inference requests in the incoming queue.
203         _exit_thread = true;
204
205         _thread_handle->join();
206         _thread_handle = nullptr;
207
208         lock_guard<mutex> lock(_outgoing_queue_mutex);
209         queue<R> empty;
210
211         swap(_outgoing_queue, empty);
212 }
213
214 template<typename R> template<typename T> void AsyncManager<R>::push(std::vector<std::vector<T> > &inputs)
215 {
216         _input_frame_number++;
217
218         if (!isInputQueueEmpty<T>()) {
219                 LOGD("input frame number(%ld) has been skipped.", _input_frame_number);
220                 return;
221         }
222
223         AsyncInputQueue<T> in_queue = { _input_frame_number, inputs };
224
225         pushToInput<T>(in_queue);
226
227         LOGD("Pushed : input frame number = %lu", in_queue.frame_number);
228
229         invoke<T>();
230 }
231
232 template<typename R> R AsyncManager<R>::pop()
233 {
234         waitforOutputQueue();
235
236         return popFromOutput();
237 }
238
239 IS_INPUT_QUEUE_EMPTY(ObjectDetectionResult);
240 INVOKE(ObjectDetectionResult);
241 POP_FROM_OUTPUT(ObjectDetectionResult);
242 PUSH_TO_INPUT(ObjectDetectionResult);
243 STOP(ObjectDetectionResult);
244 ASYNC_MANAGER(ObjectDetectionResult);
245 IS_WORKING(ObjectDetectionResult);
246 POP_FROM_INPUT(ObjectDetectionResult);
247 PUSH_TO_OUTPUT(ObjectDetectionResult);
248 INFERENCE_THREAD_LOOP(ObjectDetectionResult);
249 PUSH(ObjectDetectionResult);
250 POP(ObjectDetectionResult);
251
252 }
253 }