2 * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <nnstreamer/nnstreamer.h>
21 #include "common/tools.h"
22 #include "ml_pipeline_custom_filter.h"
25 using common::ErrorCode;
26 using common::PlatformResult;
27 using common::tools::ReportError;
28 using common::tools::ReportSuccess;
32 const std::string kCallbackId = "callbackId";
33 const std::string kListenerId = "listenerId";
34 const std::string kRequestId = "requestId";
35 const std::string kTensorsDataId = "tensorsDataId";
36 const std::string kTensorsInfoId = "tensorsInfoId";
44 const int CustomFilter::kCustomFilterError = -1;
45 const int CustomFilter::kCustomFilterIgnoreData = 1;
46 const int CustomFilter::kCustomFilterSuccess = 0;
48 CustomFilter::JSResponse::JSResponse(int status, int callback_id, TensorsData* tensors_data_ptr,
49 TensorsDataManager* tensors_data_manager_ptr,
50 TensorsInfoManager* tensors_info_manager_ptr)
52 callback_id{callback_id},
53 tensors_data_ptr{tensors_data_ptr},
54 tensors_data_manager_ptr{tensors_data_manager_ptr},
55 tensors_info_manager_ptr{tensors_info_manager_ptr} {
56 ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: %p]", status, callback_id,
60 CustomFilter::JSResponse::~JSResponse() {
61 ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: [%p]", status, callback_id,
64 if (!tensors_data_ptr) {
67 // We ignore errors, because we can't do anything about them and these methods
68 // will log error messages
69 tensors_info_manager_ptr->DisposeTensorsInfo(tensors_data_ptr->TensorsInfoId());
70 tensors_data_manager_ptr->DisposeTensorsData(tensors_data_ptr);
73 CustomFilter::JSResponse::JSResponse(JSResponse&& other)
74 : status{other.status},
75 callback_id{other.callback_id},
76 tensors_data_ptr{other.tensors_data_ptr},
77 tensors_data_manager_ptr{other.tensors_data_manager_ptr},
78 tensors_info_manager_ptr{other.tensors_info_manager_ptr} {
79 other.tensors_data_ptr = nullptr;
82 PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
83 const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
84 TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
85 TensorsInfoManager* tensors_info_manager_ptr, TensorsDataManager* tensors_data_manager_ptr,
86 std::unique_ptr<CustomFilter>* out) {
88 "name: [%s], listener_name: [%s], input_tensors_info::id: [%d], "
89 "output_tensors_info::id: [%d]",
90 name.c_str(), listener_name.c_str(), input_tensors_info_ptr->Id(),
91 output_tensors_info_ptr->Id());
93 auto* input_tensors_info_clone_ptr =
94 tensors_info_manager_ptr->CloneTensorsInfo(input_tensors_info_ptr);
95 if (!input_tensors_info_clone_ptr) {
96 return LogAndCreateResult(
97 ErrorCode::ABORT_ERR, "Could not register custom filter",
98 ("Could not clone TensorsInfo with id: [%d]", input_tensors_info_ptr->Id()));
101 auto* output_tensors_info_clone_ptr =
102 tensors_info_manager_ptr->CloneTensorsInfo(output_tensors_info_ptr);
103 if (!output_tensors_info_clone_ptr) {
104 return LogAndCreateResult(
105 ErrorCode::ABORT_ERR, "Could not register custom filter",
106 ("Could not clone TensorsInfo with id: [%d]", output_tensors_info_ptr->Id()));
109 auto custom_filter_ptr = std::unique_ptr<CustomFilter>(new (std::nothrow) CustomFilter{
110 name, listener_name, input_tensors_info_clone_ptr, output_tensors_info_clone_ptr,
111 instance_ptr, tensors_info_manager_ptr, tensors_data_manager_ptr,
112 std::this_thread::get_id()});
114 if (!custom_filter_ptr) {
115 return LogAndCreateResult(ErrorCode::ABORT_ERR, "Could not register custom filter",
116 ("Could not allocate memory"));
119 ml_custom_easy_filter_h custom_filter_handle = nullptr;
120 auto ret = ml_pipeline_custom_easy_filter_register(
121 name.c_str(), input_tensors_info_ptr->Handle(), output_tensors_info_ptr->Handle(),
122 CustomFilterListener, static_cast<void*>(custom_filter_ptr.get()), &custom_filter_handle);
123 if (ML_ERROR_NONE != ret) {
124 LoggerE("ml_pipeline_custom_easy_filter_register() failed: [%d] (%s)", ret,
125 get_error_message(ret));
126 return util::ToPlatformResult(ret, "Could not register custom filter");
128 LoggerD("ml_pipeline_custom_easy_filter_register() succeeded");
129 custom_filter_ptr->custom_filter_ = custom_filter_handle;
131 *out = std::move(custom_filter_ptr);
133 return PlatformResult{};
136 CustomFilter::CustomFilter(const std::string& name, const std::string& listener_name,
137 TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
138 common::Instance* instance_ptr,
139 TensorsInfoManager* tensors_info_manager_ptr,
140 TensorsDataManager* tensors_data_manager_ptr,
141 const std::thread::id& main_thread_id)
143 listener_name_{listener_name},
144 input_tensors_info_ptr_{input_tensors_info},
145 output_tensors_info_ptr_{output_tensors_info},
146 custom_filter_{nullptr},
147 instance_ptr_{instance_ptr},
148 tensors_info_manager_ptr_{tensors_info_manager_ptr},
149 tensors_data_manager_ptr_{tensors_data_manager_ptr},
150 main_thread_id_{main_thread_id} {
152 "name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], "
153 "output_tensors_info::id: [%d]",
154 name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(),
155 output_tensors_info_ptr_->Id());
158 CustomFilter::~CustomFilter() {
159 ScopeLogger("name: [%s]_, listener_name_: [%s], custom_filter_: [%p]", name_.c_str(),
160 listener_name_.c_str(), custom_filter_);
163 tensors_info_manager_ptr_->DisposeTensorsInfo(input_tensors_info_ptr_);
164 tensors_info_manager_ptr_->DisposeTensorsInfo(output_tensors_info_ptr_);
167 PlatformResult CustomFilter::Unregister() {
168 ScopeLogger("name_: [%s], listener_name_: [%s], custom_filter_: [%p]", name_.c_str(),
169 listener_name_.c_str(), custom_filter_);
171 if (!custom_filter_) {
172 LoggerD("CustomFilter was already unregistered");
173 return PlatformResult{};
176 auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_);
177 if (ML_ERROR_NONE != ret) {
178 LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret,
179 get_error_message(ret));
180 return util::ToPlatformResult(ret, "Could not unregister custom_filter");
182 LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded");
184 custom_filter_ = nullptr;
186 return PlatformResult{};
189 void CustomFilter::NotifyAboutJSResponse(int request_id, int status, int callback_id,
190 TensorsData* tensors_data_ptr) {
191 ScopeLogger("request_id: [%d], status: [%d], callback_id: [%d], tensors_data_ptr: [%p]",
192 request_id, status, callback_id, tensors_data_ptr);
194 std::lock_guard<std::mutex>{request_id_to_js_response_mutex_};
195 request_id_to_js_response_.emplace(
196 request_id, JSResponse{status, callback_id, tensors_data_ptr, tensors_data_manager_ptr_,
197 tensors_info_manager_ptr_});
201 int CustomFilter::getRequestId() {
204 static std::atomic_int data_id{0};
208 bool CustomFilter::PrepareMessageWithInputData(const ml_tensors_data_h input_tensors_data,
209 picojson::value* out_message, int* out_request_id) {
210 ScopeLogger("input_tensors_data: [%p]", input_tensors_data);
212 auto& message_obj = out_message->get<picojson::object>();
213 message_obj[kListenerId] = picojson::value{listener_name_};
216 * According to native API devs, the custom filter callback runs in the thread other than
219 * As we put the thread, in which the callback is called, to sleep, we test if this assumption
220 * is correct - if not, going to sleep in the main thread would stop the app.
222 const auto listener_thread_id = std::this_thread::get_id();
223 if (main_thread_id_ == listener_thread_id) {
224 LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
225 &message_obj, ("CustomFilterListener is called from the main thread"));
229 auto* input_tensors_data_clone_ptr = tensors_info_manager_ptr_->CloneNativeTensorWithData(
230 input_tensors_info_ptr_->Handle(), input_tensors_data);
231 if (!input_tensors_data_clone_ptr) {
232 LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
234 ("Could not clone tensors data. Custom filter won't be triggered."));
238 message_obj[kTensorsDataId] =
239 picojson::value{static_cast<double>(input_tensors_data_clone_ptr->Id())};
240 message_obj[kTensorsInfoId] =
241 picojson::value{static_cast<double>(input_tensors_data_clone_ptr->TensorsInfoId())};
242 *out_request_id = getRequestId();
243 message_obj[kRequestId] = picojson::value{static_cast<double>(*out_request_id)};
248 int CustomFilter::CopyJsFilterOutputToNativeObject(int request_id, const JSResponse& js_response,
249 ml_tensors_data_h output_tensors_data,
250 picojson::value* out_response_to_js) {
251 ScopeLogger("request_id: [%d]", request_id);
253 auto& response_to_js_obj = out_response_to_js->get<picojson::object>();
254 response_to_js_obj[kCallbackId] = picojson::value{static_cast<double>(js_response.callback_id)};
256 int custom_filter_status = kCustomFilterError;
257 if (kCustomFilterIgnoreData == js_response.status || js_response.status < 0) {
259 * Although js_response.status < 0 means "error", we respond with "success" message
260 * to JS, because this status came from JS and the problem, if any, is already handled there.
262 ReportSuccess(response_to_js_obj);
263 custom_filter_status = js_response.status;
264 } else if (kCustomFilterSuccess == js_response.status) {
265 auto* js_response_tensors_info_ptr =
266 tensors_info_manager_ptr_->GetTensorsInfo(js_response.tensors_data_ptr->TensorsInfoId());
268 if (!js_response_tensors_info_ptr) {
269 LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
271 ("Could not get tensors info. Custom filter won't be triggered."));
272 return kCustomFilterError;
275 if (!output_tensors_info_ptr_->Equals(js_response_tensors_info_ptr)) {
276 LogAndReportError(PlatformResult(ErrorCode::INVALID_VALUES_ERR,
277 "Output's TensorsInfo is not equal to expected"),
278 &response_to_js_obj);
279 return kCustomFilterError;
282 auto tensors_count = js_response_tensors_info_ptr->Count();
283 for (int i = 0; i < tensors_count; ++i) {
284 void* data = nullptr;
285 size_t data_size = 0;
286 auto ret = ml_tensors_data_get_tensor_data(js_response.tensors_data_ptr->Handle(), i, &data,
288 if (ML_ERROR_NONE != ret) {
289 LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
291 ("ml_tensors_data_get_tensor_data() failed: [%d] (%s), i: [%d]", ret,
292 get_error_message(ret), i));
293 return kCustomFilterError;
296 ret = ml_tensors_data_set_tensor_data(output_tensors_data, i, data, data_size);
297 if (ML_ERROR_NONE != ret) {
298 LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
300 ("ml_tensors_data_set_tensor_data() failed: [%d] (%s), i: [%d]", ret,
301 get_error_message(ret), i));
302 return kCustomFilterError;
306 custom_filter_status = kCustomFilterSuccess;
308 ReportError(PlatformResult{ErrorCode::ABORT_ERR, "Internal Customfilter error"},
309 &response_to_js_obj);
310 custom_filter_status = kCustomFilterError;
313 return custom_filter_status;
316 int CustomFilter::CustomFilterListener(const ml_tensors_data_h input_tensors_data,
317 ml_tensors_data_h output_tensors_data, void* user_data) {
318 ScopeLogger("input_tensors_data: [%p], tensors_info_out: [%p], user_data: [%p]",
319 input_tensors_data, output_tensors_data, user_data);
322 LoggerE("user_data is a nullptr");
326 CustomFilter* custom_filter_ptr = static_cast<CustomFilter*>(user_data);
328 picojson::value message{picojson::object{}};
331 custom_filter_ptr->PrepareMessageWithInputData(input_tensors_data, &message, &request_id);
333 std::unique_lock<std::mutex> lock{custom_filter_ptr->request_id_to_js_response_mutex_};
334 common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
337 LoggerE("An error occurred during input processing. Pipeline will stop");
338 return kCustomFilterError;
342 * Stage 1. of data processing (see the comment above customFilterWrapper in
343 * ml_pipeline.js) ends here.
345 * We put this thread to sleep and wait for the response from JS
348 custom_filter_ptr->cv_.wait(lock, [custom_filter_ptr, request_id]() {
349 return custom_filter_ptr->request_id_to_js_response_.count(request_id) != 0;
353 * Stage 3. of data processing starts here.
355 auto js_response{std::move(custom_filter_ptr->request_id_to_js_response_[request_id])};
356 custom_filter_ptr->request_id_to_js_response_.erase(request_id);
359 picojson::value response_to_js{picojson::object{}};
360 auto custom_filter_status = custom_filter_ptr->CopyJsFilterOutputToNativeObject(
361 request_id, js_response, output_tensors_data, &response_to_js);
363 common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, response_to_js);
365 return custom_filter_status;
368 } // namespace pipeline
370 } // namespace extension