[ML][Pipeline] Implement CustomFilter callback
[platform/core/api/webapi-plugins.git] / src / ml / ml_pipeline_custom_filter.cc
1 /*
2  * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  *    Licensed under the Apache License, Version 2.0 (the "License");
5  *    you may not use this file except in compliance with the License.
6  *    You may obtain a copy of the License at
7  *
8  *        http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *    Unless required by applicable law or agreed to in writing, software
11  *    distributed under the License is distributed on an "AS IS" BASIS,
12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *    See the License for the specific language governing permissions and
14  *    limitations under the License.
15  */
16
17 #include <nnstreamer/nnstreamer.h>
18 #include <atomic>
19 #include <utility>
20
21 #include "common/tools.h"
22 #include "ml_pipeline_custom_filter.h"
23 #include "ml_utils.h"
24
25 using common::ErrorCode;
26 using common::PlatformResult;
27 using common::tools::ReportError;
28 using common::tools::ReportSuccess;
29
30 namespace {
31
32 const std::string kCallbackId = "callbackId";
33 const std::string kListenerId = "listenerId";
34 const std::string kRequestId = "requestId";
35 const std::string kTensorsDataId = "tensorsDataId";
36 const std::string kTensorsInfoId = "tensorsInfoId";
37
38 }  //  namespace
39
40 namespace extension {
41 namespace ml {
42 namespace pipeline {
43
44 const int CustomFilter::kCustomFilterError = -1;
45 const int CustomFilter::kCustomFilterIgnoreData = 1;
46 const int CustomFilter::kCustomFilterSuccess = 0;
47
48 CustomFilter::JSResponse::JSResponse(int status, int callback_id, TensorsData* tensors_data_ptr,
49                                      TensorsDataManager* tensors_data_manager_ptr,
50                                      TensorsInfoManager* tensors_info_manager_ptr)
51     : status{status},
52       callback_id{callback_id},
53       tensors_data_ptr{tensors_data_ptr},
54       tensors_data_manager_ptr{tensors_data_manager_ptr},
55       tensors_info_manager_ptr{tensors_info_manager_ptr} {
56   ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: %p]", status, callback_id,
57               tensors_data_ptr);
58 }
59
60 CustomFilter::JSResponse::~JSResponse() {
61   ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: [%p]", status, callback_id,
62               tensors_data_ptr);
63
64   if (!tensors_data_ptr) {
65     return;
66   }
67   // We ignore errors, because we can't do anything about them and these methods
68   // will log error messages
69   tensors_info_manager_ptr->DisposeTensorsInfo(tensors_data_ptr->TensorsInfoId());
70   tensors_data_manager_ptr->DisposeTensorsData(tensors_data_ptr);
71 }
72
73 CustomFilter::JSResponse::JSResponse(JSResponse&& other)
74     : status{other.status},
75       callback_id{other.callback_id},
76       tensors_data_ptr{other.tensors_data_ptr},
77       tensors_data_manager_ptr{other.tensors_data_manager_ptr},
78       tensors_info_manager_ptr{other.tensors_info_manager_ptr} {
79   other.tensors_data_ptr = nullptr;
80 }
81
82 PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
83     const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
84     TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
85     TensorsInfoManager* tensors_info_manager_ptr, TensorsDataManager* tensors_data_manager_ptr,
86     std::unique_ptr<CustomFilter>* out) {
87   ScopeLogger(
88       "name: [%s], listener_name: [%s], input_tensors_info::id: [%d], "
89       "output_tensors_info::id: [%d]",
90       name.c_str(), listener_name.c_str(), input_tensors_info_ptr->Id(),
91       output_tensors_info_ptr->Id());
92
93   auto* input_tensors_info_clone_ptr =
94       tensors_info_manager_ptr->CloneTensorsInfo(input_tensors_info_ptr);
95   if (!input_tensors_info_clone_ptr) {
96     return LogAndCreateResult(
97         ErrorCode::ABORT_ERR, "Could not register custom filter",
98         ("Could not clone TensorsInfo with id: [%d]", input_tensors_info_ptr->Id()));
99   }
100
101   auto* output_tensors_info_clone_ptr =
102       tensors_info_manager_ptr->CloneTensorsInfo(output_tensors_info_ptr);
103   if (!output_tensors_info_clone_ptr) {
104     return LogAndCreateResult(
105         ErrorCode::ABORT_ERR, "Could not register custom filter",
106         ("Could not clone TensorsInfo with id: [%d]", output_tensors_info_ptr->Id()));
107   }
108
109   auto custom_filter_ptr = std::unique_ptr<CustomFilter>(new (std::nothrow) CustomFilter{
110       name, listener_name, input_tensors_info_clone_ptr, output_tensors_info_clone_ptr,
111       instance_ptr, tensors_info_manager_ptr, tensors_data_manager_ptr,
112       std::this_thread::get_id()});
113   ;
114   if (!custom_filter_ptr) {
115     return LogAndCreateResult(ErrorCode::ABORT_ERR, "Could not register custom filter",
116                               ("Could not allocate memory"));
117   }
118
119   ml_custom_easy_filter_h custom_filter_handle = nullptr;
120   auto ret = ml_pipeline_custom_easy_filter_register(
121       name.c_str(), input_tensors_info_ptr->Handle(), output_tensors_info_ptr->Handle(),
122       CustomFilterListener, static_cast<void*>(custom_filter_ptr.get()), &custom_filter_handle);
123   if (ML_ERROR_NONE != ret) {
124     LoggerE("ml_pipeline_custom_easy_filter_register() failed: [%d] (%s)", ret,
125             get_error_message(ret));
126     return util::ToPlatformResult(ret, "Could not register custom filter");
127   }
128   LoggerD("ml_pipeline_custom_easy_filter_register() succeeded");
129   custom_filter_ptr->custom_filter_ = custom_filter_handle;
130
131   *out = std::move(custom_filter_ptr);
132
133   return PlatformResult{};
134 }
135
136 CustomFilter::CustomFilter(const std::string& name, const std::string& listener_name,
137                            TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
138                            common::Instance* instance_ptr,
139                            TensorsInfoManager* tensors_info_manager_ptr,
140                            TensorsDataManager* tensors_data_manager_ptr,
141                            const std::thread::id& main_thread_id)
142     : name_{name},
143       listener_name_{listener_name},
144       input_tensors_info_ptr_{input_tensors_info},
145       output_tensors_info_ptr_{output_tensors_info},
146       custom_filter_{nullptr},
147       instance_ptr_{instance_ptr},
148       tensors_info_manager_ptr_{tensors_info_manager_ptr},
149       tensors_data_manager_ptr_{tensors_data_manager_ptr},
150       main_thread_id_{main_thread_id} {
151   ScopeLogger(
152       "name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], "
153       "output_tensors_info::id: [%d]",
154       name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(),
155       output_tensors_info_ptr_->Id());
156 }
157
158 CustomFilter::~CustomFilter() {
159   ScopeLogger("name: [%s]_, listener_name_: [%s], custom_filter_: [%p]", name_.c_str(),
160               listener_name_.c_str(), custom_filter_);
161
162   Unregister();
163   tensors_info_manager_ptr_->DisposeTensorsInfo(input_tensors_info_ptr_);
164   tensors_info_manager_ptr_->DisposeTensorsInfo(output_tensors_info_ptr_);
165 }
166
167 PlatformResult CustomFilter::Unregister() {
168   ScopeLogger("name_: [%s], listener_name_: [%s], custom_filter_: [%p]", name_.c_str(),
169               listener_name_.c_str(), custom_filter_);
170
171   if (!custom_filter_) {
172     LoggerD("CustomFilter was already unregistered");
173     return PlatformResult{};
174   }
175
176   auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_);
177   if (ML_ERROR_NONE != ret) {
178     LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret,
179             get_error_message(ret));
180     return util::ToPlatformResult(ret, "Could not unregister custom_filter");
181   }
182   LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded");
183
184   custom_filter_ = nullptr;
185
186   return PlatformResult{};
187 }
188
189 void CustomFilter::NotifyAboutJSResponse(int request_id, int status, int callback_id,
190                                          TensorsData* tensors_data_ptr) {
191   ScopeLogger("request_id: [%d], status: [%d], callback_id: [%d], tensors_data_ptr: [%p]",
192               request_id, status, callback_id, tensors_data_ptr);
193
194   std::lock_guard<std::mutex>{request_id_to_js_response_mutex_};
195   request_id_to_js_response_.emplace(
196       request_id, JSResponse{status, callback_id, tensors_data_ptr, tensors_data_manager_ptr_,
197                              tensors_info_manager_ptr_});
198   cv_.notify_all();
199 }
200
201 int CustomFilter::getRequestId() {
202   ScopeLogger();
203
204   static std::atomic_int data_id{0};
205   return data_id++;
206 }
207
208 bool CustomFilter::PrepareMessageWithInputData(const ml_tensors_data_h input_tensors_data,
209                                                picojson::value* out_message, int* out_request_id) {
210   ScopeLogger("input_tensors_data: [%p]", input_tensors_data);
211
212   auto& message_obj = out_message->get<picojson::object>();
213   message_obj[kListenerId] = picojson::value{listener_name_};
214
215   /*
216    * According to native API devs, the custom filter callback runs in the thread other than
217    * the main.
218    *
219    * As we put the thread, in which the callback is called, to sleep, we test if this assumption
220    * is correct - if not, going to sleep in the main thread would stop the app.
221    */
222   const auto listener_thread_id = std::this_thread::get_id();
223   if (main_thread_id_ == listener_thread_id) {
224     LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
225                       &message_obj, ("CustomFilterListener is called from the main thread"));
226     return false;
227   }
228
229   auto* input_tensors_data_clone_ptr = tensors_info_manager_ptr_->CloneNativeTensorWithData(
230       input_tensors_info_ptr_->Handle(), input_tensors_data);
231   if (!input_tensors_data_clone_ptr) {
232     LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
233                       &message_obj,
234                       ("Could not clone tensors data. Custom filter won't be triggered."));
235     return false;
236   }
237
238   message_obj[kTensorsDataId] =
239       picojson::value{static_cast<double>(input_tensors_data_clone_ptr->Id())};
240   message_obj[kTensorsInfoId] =
241       picojson::value{static_cast<double>(input_tensors_data_clone_ptr->TensorsInfoId())};
242   *out_request_id = getRequestId();
243   message_obj[kRequestId] = picojson::value{static_cast<double>(*out_request_id)};
244
245   return true;
246 }
247
248 int CustomFilter::CopyJsFilterOutputToNativeObject(int request_id, const JSResponse& js_response,
249                                                    ml_tensors_data_h output_tensors_data,
250                                                    picojson::value* out_response_to_js) {
251   ScopeLogger("request_id: [%d]", request_id);
252
253   auto& response_to_js_obj = out_response_to_js->get<picojson::object>();
254   response_to_js_obj[kCallbackId] = picojson::value{static_cast<double>(js_response.callback_id)};
255
256   int custom_filter_status = kCustomFilterError;
257   if (kCustomFilterIgnoreData == js_response.status || js_response.status < 0) {
258     /*
259      * Although js_response.status < 0 means "error", we respond with "success" message
260      * to JS, because this status came from JS and the problem, if any, is already handled there.
261      */
262     ReportSuccess(response_to_js_obj);
263     custom_filter_status = js_response.status;
264   } else if (kCustomFilterSuccess == js_response.status) {
265     auto* js_response_tensors_info_ptr =
266         tensors_info_manager_ptr_->GetTensorsInfo(js_response.tensors_data_ptr->TensorsInfoId());
267
268     if (!js_response_tensors_info_ptr) {
269       LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
270                         &response_to_js_obj,
271                         ("Could not get tensors info. Custom filter won't be triggered."));
272       return kCustomFilterError;
273     }
274
275     if (!output_tensors_info_ptr_->Equals(js_response_tensors_info_ptr)) {
276       LogAndReportError(PlatformResult(ErrorCode::INVALID_VALUES_ERR,
277                                        "Output's TensorsInfo is not equal to expected"),
278                         &response_to_js_obj);
279       return kCustomFilterError;
280     }
281
282     auto tensors_count = js_response_tensors_info_ptr->Count();
283     for (int i = 0; i < tensors_count; ++i) {
284       void* data = nullptr;
285       size_t data_size = 0;
286       auto ret = ml_tensors_data_get_tensor_data(js_response.tensors_data_ptr->Handle(), i, &data,
287                                                  &data_size);
288       if (ML_ERROR_NONE != ret) {
289         LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
290                           &response_to_js_obj,
291                           ("ml_tensors_data_get_tensor_data() failed: [%d] (%s), i: [%d]", ret,
292                            get_error_message(ret), i));
293         return kCustomFilterError;
294       }
295
296       ret = ml_tensors_data_set_tensor_data(output_tensors_data, i, data, data_size);
297       if (ML_ERROR_NONE != ret) {
298         LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
299                           &response_to_js_obj,
300                           ("ml_tensors_data_set_tensor_data() failed: [%d] (%s), i: [%d]", ret,
301                            get_error_message(ret), i));
302         return kCustomFilterError;
303       }
304     }
305
306     custom_filter_status = kCustomFilterSuccess;
307   } else {
308     ReportError(PlatformResult{ErrorCode::ABORT_ERR, "Internal Customfilter error"},
309                 &response_to_js_obj);
310     custom_filter_status = kCustomFilterError;
311   }
312
313   return custom_filter_status;
314 }
315
316 int CustomFilter::CustomFilterListener(const ml_tensors_data_h input_tensors_data,
317                                        ml_tensors_data_h output_tensors_data, void* user_data) {
318   ScopeLogger("input_tensors_data: [%p], tensors_info_out: [%p], user_data: [%p]",
319               input_tensors_data, output_tensors_data, user_data);
320
321   if (!user_data) {
322     LoggerE("user_data is a nullptr");
323     return -1;
324   }
325
326   CustomFilter* custom_filter_ptr = static_cast<CustomFilter*>(user_data);
327
328   picojson::value message{picojson::object{}};
329   int request_id = -1;
330   auto success =
331       custom_filter_ptr->PrepareMessageWithInputData(input_tensors_data, &message, &request_id);
332
333   std::unique_lock<std::mutex> lock{custom_filter_ptr->request_id_to_js_response_mutex_};
334   common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
335
336   if (!success) {
337     LoggerE("An error occurred during input processing. Pipeline will stop");
338     return kCustomFilterError;
339   }
340
341   /*
342    * Stage 1. of data processing (see the comment above customFilterWrapper in
343    * ml_pipeline.js) ends here.
344    *
345    * We put this thread to sleep and wait for the response from JS
346    * (the main thread).
347    */
348   custom_filter_ptr->cv_.wait(lock, [custom_filter_ptr, request_id]() {
349     return custom_filter_ptr->request_id_to_js_response_.count(request_id) != 0;
350   });
351
352   /*
353    * Stage 3. of data processing starts here.
354    */
355   auto js_response{std::move(custom_filter_ptr->request_id_to_js_response_[request_id])};
356   custom_filter_ptr->request_id_to_js_response_.erase(request_id);
357   lock.unlock();
358
359   picojson::value response_to_js{picojson::object{}};
360   auto custom_filter_status = custom_filter_ptr->CopyJsFilterOutputToNativeObject(
361       request_id, js_response, output_tensors_data, &response_to_js);
362
363   common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, response_to_js);
364
365   return custom_filter_status;
366 }
367
368 }  // namespace pipeline
369 }  // namespace ml
370 }  // namespace extension