From ae5524aa38d767ef413214927fdcd5b186d5d6d3 Mon Sep 17 00:00:00 2001 From: Pawel Wasowski Date: Mon, 1 Feb 2021 15:01:13 +0100 Subject: [PATCH] [ML][pipeline] Implement Pipeline::{register,unregister}SinkListener ACR: TWDAPI-274 [Verification] Tested in Chrome DevTools with the snippets below. Works fine var pipeline_def = "videotestsrc num-buffers=3 ! videoconvert" + " ! tensor_converter ! tensor_sink name=sinkx"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, function(state) {console.log(state);}); // READY // PAUSED pipeline.registerSinkListener('sinkx', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) pipeline.start() // registered SinkListener is called 3 times Change-Id: I8790327d070e54d27fd9cc028882773487a48d7a Signed-off-by: Pawel Wasowski --- src/ml/js/ml_pipeline.js | 87 ++++++++++++++++++++- src/ml/ml.gyp | 2 + src/ml/ml_instance.cc | 39 ++++++++++ src/ml/ml_instance.h | 4 +- src/ml/ml_pipeline.cc | 45 ++++++++++- src/ml/ml_pipeline.h | 12 ++- src/ml/ml_pipeline_manager.cc | 25 +++++- src/ml/ml_pipeline_manager.h | 5 +- src/ml/ml_pipeline_sink.cc | 140 ++++++++++++++++++++++++++++++++++ src/ml/ml_pipeline_sink.h | 71 +++++++++++++++++ 10 files changed, 415 insertions(+), 15 deletions(-) create mode 100644 src/ml/ml_pipeline_sink.cc create mode 100644 src/ml/ml_pipeline_sink.h diff --git a/src/ml/js/ml_pipeline.js b/src/ml/js/ml_pipeline.js index 5429ba7b..ef68dcd7 100755 --- a/src/ml/js/ml_pipeline.js +++ b/src/ml/js/ml_pipeline.js @@ -15,6 +15,7 @@ */ var kPipelineStateChangeListenerNamePrefix = 'MLPipelineStateChangeListener'; +var kSinkListenerNamePrefix = 'MLPipelineSinkListener'; //PipelineManager::createPipeline() begin var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError']; @@ -398,13 +399,91 @@ Pipeline.prototype.getValve = function() { }; //Pipeline::getValve() end -//Pipeline::registerSinkCallback() begin +//Pipeline::registerSinkListener() begin +var ValidRegisterSinkListenerExceptions = [ + 'InvalidValuesError', + 'NotFoundError', + 'NotSupportedError', + 'TypeMismatchError', + 'AbortError' +]; +Pipeline.prototype.registerSinkListener = function() { + var args = validator_.validateArgs(arguments, [ + { + name: 'name', + type: validator_.Types.STRING + }, + { + name: 'sinkListener', + type: types_.FUNCTION + } + ]); + + var nativeArgs = { + id: this._id, + name: args.name, + listenerName: kSinkListenerNamePrefix + args.name + }; -//Pipeline::registerSinkCallback() end + var sinkListener = function(msg) { + var sinkData = new TensorsData(msg.tensorsDataId, msg.tensorsInfoId); + args.sinkListener(args.name, sinkData); + }; + native_.addListener(nativeArgs.listenerName, sinkListener); -//Pipeline::unregisterSinkCallback() begin + var result = native_.callSync('MLPipelineRegisterSinkListener', nativeArgs); + if (native_.isFailure(result)) { + native_.removeListener(nativeArgs.listenerName); -//Pipeline::unregisterSinkCallback() end + throw native_.getErrorObjectAndValidate( + result, + ValidRegisterSinkListenerExceptions, + AbortError + ); + } +}; +//Pipeline::registerSinkListener() end + +//Pipeline::unregisterSinkListener() begin +var ValidUnregisterSinkListenerExceptions = [ + 'InvalidValuesError', + 'NotFoundError', + 'NotSupportedError', + 'AbortError' +]; +Pipeline.prototype.unregisterSinkListener = function() { + var args = validator_.validateArgs(arguments, [ + { + name: 'name', + type: validator_.Types.STRING + } + ]); + + if (!args.has.name) { + throw new WebAPIException( + WebAPIException.INVALID_VALUES_ERR, + 'Invalid parameter: sink name is mandatory' + ); + } + + var nativeArgs = { + id: this._id, + name: args.name + }; + + var result = native_.callSync('MLPipelineUnregisterSinkListener', nativeArgs); + if (native_.isFailure(result)) { + throw native_.getErrorObjectAndValidate( + result, + ValidUnregisterSinkListenerExceptions, + AbortError + ); + } + + var listenerName = kSinkListenerNamePrefix + args.name; + native_.removeListener(listenerName); +}; +//Pipeline::unregisterSinkListener() end //Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml.gyp b/src/ml/ml.gyp index 0b085533..738bdd29 100644 --- a/src/ml/ml.gyp +++ b/src/ml/ml.gyp @@ -21,6 +21,8 @@ 'ml_pipeline_manager.h', 'ml_pipeline_nodeinfo.cc', 'ml_pipeline_nodeinfo.h', + 'ml_pipeline_sink.cc', + 'ml_pipeline_sink.h', 'ml_pipeline_switch.cc', 'ml_pipeline_switch.h', 'ml_pipeline_source.h', diff --git a/src/ml/ml_instance.cc b/src/ml/ml_instance.cc index 20c6ec82..623269cd 100644 --- a/src/ml/ml_instance.cc +++ b/src/ml/ml_instance.cc @@ -52,6 +52,7 @@ const std::string kBuffer = "buffer"; const std::string kSize = "size"; const std::string kLocation = "location"; const std::string kShape = "shape"; +const std::string kListenerName = "listenerName"; } // namespace using namespace common; @@ -144,6 +145,8 @@ MlInstance::MlInstance() REGISTER_METHOD(MLPipelineGetSource); REGISTER_METHOD(MLPipelineGetInputTensorsInfo); REGISTER_METHOD(MLPipelineSourceInputData); + REGISTER_METHOD(MLPipelineRegisterSinkListener); + REGISTER_METHOD(MLPipelineUnregisterSinkListener); // Pipeline API end #undef REGISTER_METHOD @@ -1085,11 +1088,47 @@ void MlInstance::MLPipelineGetValve(const picojson::value& args, picojson::objec // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin +void MlInstance::MLPipelineRegisterSinkListener(const picojson::value& args, + picojson::object& out) { + ScopeLogger("args: %s", args.serialize().c_str()); + + CHECK_ARGS(args, kId, double, out); + CHECK_ARGS(args, kName, std::string, out); + CHECK_ARGS(args, kListenerName, std::string, out); + auto pipeline_id = static_cast(args.get(kId).get()); + auto sink_name = args.get(kName).get(); + auto listener_name = args.get(kListenerName).get(); + + auto ret = pipeline_manager_.RegisterSinkListener(sink_name, pipeline_id, listener_name); + if (!ret) { + LogAndReportError(ret, &out); + return; + } + + ReportSuccess(out); +} // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin +void MlInstance::MLPipelineUnregisterSinkListener(const picojson::value& args, + picojson::object& out) { + ScopeLogger("args: %s", args.serialize().c_str()); + + CHECK_ARGS(args, kId, double, out); + CHECK_ARGS(args, kName, std::string, out); + auto pipeline_id = static_cast(args.get(kId).get()); + auto sink_name = args.get(kName).get(); + + auto ret = pipeline_manager_.UnregisterSinkListener(sink_name, pipeline_id); + if (!ret) { + LogAndReportError(ret, &out); + return; + } + + ReportSuccess(out); +} // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml_instance.h b/src/ml/ml_instance.h index 36ff8834..e905bfde 100644 --- a/src/ml/ml_instance.h +++ b/src/ml/ml_instance.h @@ -118,11 +118,11 @@ class MlInstance : public common::ParsedInstance { // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin - + void MLPipelineRegisterSinkListener(const picojson::value& args, picojson::object& out); // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin - + void MLPipelineUnregisterSinkListener(const picojson::value& args, picojson::object& out); // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml_pipeline.cc b/src/ml/ml_pipeline.cc index 22928440..f3d408f5 100644 --- a/src/ml/ml_pipeline.cc +++ b/src/ml/ml_pipeline.cc @@ -18,6 +18,7 @@ #include "common/logger.h" #include "common/picojson.h" #include "ml_pipeline.h" +#include "ml_pipeline_sink.h" #include "ml_utils.h" using common::PlatformResult; @@ -63,10 +64,11 @@ namespace extension { namespace ml { Pipeline::Pipeline(int id, const std::string& state_change_listener_name, - common::Instance* instance_ptr) + TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr) : id_{id}, pipeline_{nullptr}, // this will be set to a proper pointer in CreatePipeline() state_change_listener_name_{state_change_listener_name}, + tensors_info_manager_ptr_{tensors_info_manager_ptr}, instance_ptr_{instance_ptr} { ScopeLogger("id: [%d], state_change_listener_name: [%s]", id, state_change_listener_name.c_str()); } @@ -75,6 +77,7 @@ Pipeline::Pipeline(int id, const std::string& state_change_listener_name, PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition, const std::string& state_change_listener_name, common::Instance* instance_ptr, + TensorsInfoManager* tensors_info_manager_ptr, std::unique_ptr* out) { ScopeLogger("id: [%d], definition: [%s], state_change_listener_name: [%s]", id, definition.c_str(), state_change_listener_name.c_str()); @@ -83,8 +86,8 @@ PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition, * because Pipeline is the user data for the listener registered by * ml_pipeline_construct(). */ - std::unique_ptr pipeline_ptr{ - new (std::nothrow) Pipeline{id, state_change_listener_name, instance_ptr}}; + std::unique_ptr pipeline_ptr{new (std::nothrow) Pipeline{ + id, state_change_listener_name, tensors_info_manager_ptr, instance_ptr}}; if (!pipeline_ptr) { return LogAndCreateResult(ErrorCode::ABORT_ERR, "An unknown occurred.", ("Could not allocate memory for Pipeline")); @@ -196,6 +199,8 @@ PlatformResult Pipeline::Dispose() { valves_.clear(); + sinks_.clear(); + sources_.clear(); auto ret = ml_pipeline_destroy(pipeline_); @@ -296,11 +301,45 @@ PlatformResult Pipeline::GetValve(const std::string& name) { // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin +PlatformResult Pipeline::RegisterSinkListener(const std::string& sink_name, + const std::string& listener_name) { + ScopeLogger("sink_name: [%s], listener_name: [%s], id_: [%d]", sink_name.c_str(), + listener_name.c_str(), id_); + + if (sinks_.count(sink_name)) { + LoggerD("Listener for [%s] sink is already registered", sink_name.c_str()); + return PlatformResult{}; + } + + std::unique_ptr sink_ptr; + auto ret = Sink::CreateAndRegisterSink(sink_name, listener_name, pipeline_, instance_ptr_, + tensors_info_manager_ptr_, &sink_ptr); + if (!ret) { + return ret; + } + + sinks_.insert({sink_name, std::move(sink_ptr)}); + return PlatformResult{}; +} // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin +PlatformResult Pipeline::UnregisterSinkListener(const std::string& sink_name) { + ScopeLogger("sink_name: [%s], id_: [%d]", sink_name.c_str(), id_); + + auto sink_it = sinks_.find(sink_name); + if (sinks_.end() == sink_it) { + LoggerD("sink [%s] not found", sink_name.c_str()); + return PlatformResult{ErrorCode::ABORT_ERR, "An unknown error occurred"}; + } + auto ret = sink_it->second->Unregister(); + if (ret) { + sinks_.erase(sink_it); + } + return ret; +} // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml_pipeline.h b/src/ml/ml_pipeline.h index 49b681ee..c529e6bc 100644 --- a/src/ml/ml_pipeline.h +++ b/src/ml/ml_pipeline.h @@ -27,6 +27,7 @@ #include "common/picojson.h" #include "common/platform_result.h" #include "ml_pipeline_nodeinfo.h" +#include "ml_pipeline_sink.h" #include "ml_pipeline_source.h" #include "ml_pipeline_switch.h" #include "ml_pipeline_valve.h" @@ -49,6 +50,7 @@ class Pipeline { static PlatformResult CreatePipeline(int id, const std::string& definition, const std::string& state_change_listener_name, common::Instance* instance_ptr, + TensorsInfoManager* tensors_info_manager_ptr, std::unique_ptr* out); // PipelineManager::createPipeline() end Pipeline() = delete; @@ -90,11 +92,12 @@ class Pipeline { // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin - + PlatformResult RegisterSinkListener(const std::string& sink_name, + const std::string& listener_name); // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin - + PlatformResult UnregisterSinkListener(const std::string& sink_name); // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin @@ -133,11 +136,13 @@ class Pipeline { PlatformResult GetValve(const std::string& name, Valve** out); // Valve::setOpen() end private: - Pipeline(int id, const std::string& state_change_listener_name, common::Instance* instance_ptr); + Pipeline(int id, const std::string& state_change_listener_name, + TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr); const int id_; ml_pipeline_h pipeline_; const std::string state_change_listener_name_; + TensorsInfoManager* tensors_info_manager_ptr_; common::Instance* instance_ptr_; /* ######### VERY IMPORTANT ######### @@ -155,6 +160,7 @@ class Pipeline { std::unordered_map> switches_; std::map> node_info_; std::unordered_map> valves_; + std::unordered_map> sinks_; std::map> sources_; static void PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data); diff --git a/src/ml/ml_pipeline_manager.cc b/src/ml/ml_pipeline_manager.cc index 7d898710..c2c3e1a8 100644 --- a/src/ml/ml_pipeline_manager.cc +++ b/src/ml/ml_pipeline_manager.cc @@ -49,7 +49,7 @@ PlatformResult PipelineManager::CreatePipeline(int id, const std::string& defini std::unique_ptr pipeline_ptr; auto ret = Pipeline::CreatePipeline(id, definition, state_change_listener_name, instance_ptr_, - &pipeline_ptr); + tensors_info_manager_, &pipeline_ptr); if (!ret) { return ret; } @@ -189,11 +189,34 @@ PlatformResult PipelineManager::GetValve(const std::string& name, int pipeline_i // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin +PlatformResult PipelineManager::RegisterSinkListener(const std::string& sink_name, int pipeline_id, + const std::string& listener_name) { + ScopeLogger("sink_name: [%s], pipeline_id: [%d], listener_name: [%s]", sink_name.c_str(), + pipeline_id, listener_name.c_str()); + auto pipeline_it = pipelines_.find(pipeline_id); + if (pipelines_.end() == pipeline_it) { + LoggerD("Pipeline not found: [%d]", pipeline_id); + return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"}; + } + + return pipeline_it->second->RegisterSinkListener(sink_name, listener_name); +} // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin +PlatformResult PipelineManager::UnregisterSinkListener(const std::string& sink_name, + int pipeline_id) { + ScopeLogger("sink_name: [%s], pipeline_id: [%d]", sink_name.c_str(), pipeline_id); + auto pipeline_it = pipelines_.find(pipeline_id); + if (pipelines_.end() == pipeline_it) { + LoggerD("Pipeline not found: [%d]", pipeline_id); + return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"}; + } + + return pipeline_it->second->UnregisterSinkListener(sink_name); +} // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml_pipeline_manager.h b/src/ml/ml_pipeline_manager.h index b76d3a66..a2e6fa9c 100644 --- a/src/ml/ml_pipeline_manager.h +++ b/src/ml/ml_pipeline_manager.h @@ -78,11 +78,12 @@ class PipelineManager { // Pipeline::getValve() end // Pipeline::registerSinkCallback() begin - + PlatformResult RegisterSinkListener(const std::string& sink_name, int pipeline_id, + const std::string& listener_name); // Pipeline::registerSinkCallback() end // Pipeline::unregisterSinkCallback() begin - + PlatformResult UnregisterSinkListener(const std::string& sink_name, int pipeline_id); // Pipeline::unregisterSinkCallback() end // Pipeline::registerCustomFilter() begin diff --git a/src/ml/ml_pipeline_sink.cc b/src/ml/ml_pipeline_sink.cc new file mode 100644 index 00000000..d5a14205 --- /dev/null +++ b/src/ml/ml_pipeline_sink.cc @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "ml_pipeline_sink.h" +#include "ml_utils.h" + +using common::PlatformResult; +using common::ErrorCode; + +namespace { + +const std::string kListenerId = "listenerId"; +const std::string kTensorsDataId = "tensorsDataId"; +const std::string kTensorsInfoId = "tensorsInfoId"; + +} // namespace + +namespace extension { +namespace ml { +namespace pipeline { + +PlatformResult Sink::CreateAndRegisterSink(const std::string& name, + const std::string& listener_name, ml_pipeline_h pipeline, + common::Instance* instance_ptr, + TensorsInfoManager* tensors_info_manager_ptr, + std::unique_ptr* out) { + ScopeLogger( + "name: [%s], listener_name: [%s], pipeline: [%p], instance_ptr: [%p], " + "tensors_info_manager_ptr: [%p]", + name.c_str(), listener_name.c_str(), pipeline, instance_ptr, tensors_info_manager_ptr); + + auto sink_ptr = std::unique_ptr( + new (std::nothrow) Sink{name, listener_name, instance_ptr, tensors_info_manager_ptr}); + if (!sink_ptr) { + return LogAndCreateResult(ErrorCode::ABORT_ERR, "Could not register sink listener", + ("Could not allocate memory")); + } + + ml_pipeline_sink_h sink_handle = nullptr; + auto ret = ml_pipeline_sink_register(pipeline, name.c_str(), SinkListener, + static_cast(sink_ptr.get()), &sink_handle); + if (ML_ERROR_NONE != ret) { + LoggerE("ml_pipeline_sink_register() failed: [%d] (%s)", ret, get_error_message(ret)); + return util::ToPlatformResult(ret, "Could not register sink listener"); + } + LoggerD("ml_pipeline_sink_register() succeeded"); + sink_ptr->sink_ = sink_handle; + + *out = std::move(sink_ptr); + + return PlatformResult{}; +} + +Sink::Sink(const std::string& name, const std::string& listener_name, + common::Instance* instance_ptr, TensorsInfoManager* tensors_info_manager_ptr) + : name_{name}, + listener_name_{listener_name}, + sink_{nullptr}, + instance_ptr_{instance_ptr}, + tensors_info_manager_ptr_{tensors_info_manager_ptr} { + ScopeLogger( + "name: [%s]_, listener_name: [%s]_, instance_ptr_: [%p], tensors_info_manager_ptr_: [%p]", + name.c_str(), listener_name.c_str(), instance_ptr_, tensors_info_manager_ptr_); +} + +Sink::~Sink() { + ScopeLogger("name: [%s]_, listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(), + listener_name_.c_str(), sink_, instance_ptr_); + + Unregister(); +} + +void Sink::SinkListener(ml_tensors_data_h tensors_data_src, ml_tensors_info_h tensors_info_src, + void* user_data) { + ScopeLogger("tensors_data_src: [%p], tensors_info_src: [%p], user_data: [%p]", tensors_data_src, + tensors_info_src, user_data); + + if (!user_data) { + LoggerE("user_data is a nullptr"); + return; + } + + Sink* sink_ptr = static_cast(user_data); + + auto* tensors_data_ptr = sink_ptr->tensors_info_manager_ptr_->CloneNativeTensorWithData( + tensors_info_src, tensors_data_src); + if (!tensors_data_ptr) { + LoggerE("Could not clone tensors data. Sink listener won't be triggered."); + return; + } + + picojson::value response{picojson::object{}}; + auto& response_obj = response.get(); + response_obj[kListenerId] = picojson::value{sink_ptr->listener_name_}; + response_obj[kTensorsDataId] = picojson::value{static_cast(tensors_data_ptr->Id())}; + response_obj[kTensorsInfoId] = + picojson::value{static_cast(tensors_data_ptr->TensorsInfoId())}; + + common::Instance::PostMessage(sink_ptr->instance_ptr_, response); +} + +PlatformResult Sink::Unregister() { + ScopeLogger("name_: [%s], listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(), + listener_name_.c_str(), sink_, instance_ptr_); + + if (!sink_) { + LoggerD("Sink was already unregistered"); + return PlatformResult{}; + } + + auto ret = ml_pipeline_sink_unregister(sink_); + if (ML_ERROR_NONE != ret) { + LoggerE("ml_pipeline_sink_unregister() failed: [%d] (%s)", ret, get_error_message(ret)); + return util::ToPlatformResult(ret, "Could not unregister sink"); + } + LoggerD("ml_pipeline_sink_unregister() succeeded"); + + sink_ = nullptr; + + return PlatformResult{}; +} + +} // namespace pipeline +} // namespace ml +} // namespace extension \ No newline at end of file diff --git a/src/ml/ml_pipeline_sink.h b/src/ml/ml_pipeline_sink.h new file mode 100644 index 00000000..b7970c5e --- /dev/null +++ b/src/ml/ml_pipeline_sink.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ML_ML_PIPELINE_SINK_H_ +#define ML_ML_PIPELINE_SINK_H_ + +#include +#include + +#include + +#include "common/extension.h" +#include "common/platform_result.h" + +#include "ml_tensors_data_manager.h" +#include "ml_tensors_info_manager.h" + +using common::PlatformResult; + +namespace extension { +namespace ml { +namespace pipeline { + +class Sink { + public: + static PlatformResult CreateAndRegisterSink(const std::string& name, + const std::string& listener_name, + ml_pipeline_h pipeline, + common::Instance* instance_ptr, + TensorsInfoManager* tensors_info_manager_ptr, + std::unique_ptr* out); + + ~Sink(); + + PlatformResult Unregister(); + + Sink(const Sink&) = delete; + Sink& operator=(const Sink&) = delete; + + private: + Sink(const std::string& name, const std::string& listener_name, common::Instance* instance_ptr, + TensorsInfoManager* tensors_info_manager_pt); + + static void SinkListener(ml_tensors_data_h tensors_data, ml_tensors_info_h tensors_info, + void* user_data); + + const std::string name_; + const std::string listener_name_; + ml_pipeline_sink_h sink_; + common::Instance* instance_ptr_; + TensorsInfoManager* tensors_info_manager_ptr_; +}; + +} // namespace pipeline +} // namespace ml +} // namespace extension + +#endif // ML_ML_PIPELINE_SINK_H_ -- 2.34.1