[ML][pipeline] Implement Pipeline::{register,unregister}SinkListener 00/252700/13
authorPawel Wasowski <p.wasowski2@samsung.com>
Mon, 1 Feb 2021 14:01:13 +0000 (15:01 +0100)
committerPawel Wasowski <p.wasowski2@samsung.com>
Fri, 12 Feb 2021 12:34:43 +0000 (12:34 +0000)
ACR: TWDAPI-274

[Verification] Tested in Chrome DevTools with the snippets below. Works
fine

var pipeline_def = "videotestsrc num-buffers=3 ! videoconvert"
                   + " ! tensor_converter ! tensor_sink name=sinkx";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                      function(state) {console.log(state);});
// READY
// PAUSED

pipeline.registerSinkListener('sinkx', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

pipeline.start()
// registered SinkListener is called 3 times

Change-Id: I8790327d070e54d27fd9cc028882773487a48d7a
Signed-off-by: Pawel Wasowski <p.wasowski2@samsung.com>
src/ml/js/ml_pipeline.js
src/ml/ml.gyp
src/ml/ml_instance.cc
src/ml/ml_instance.h
src/ml/ml_pipeline.cc
src/ml/ml_pipeline.h
src/ml/ml_pipeline_manager.cc
src/ml/ml_pipeline_manager.h
src/ml/ml_pipeline_sink.cc [new file with mode: 0644]
src/ml/ml_pipeline_sink.h [new file with mode: 0644]

index 5429ba7b99e7b29ee62907f7a7f274507707162e..ef68dcd79adb4396fda57b6a753399da72046a5a 100755 (executable)
@@ -15,6 +15,7 @@
  */
 
 var kPipelineStateChangeListenerNamePrefix = 'MLPipelineStateChangeListener';
+var kSinkListenerNamePrefix = 'MLPipelineSinkListener';
 
 //PipelineManager::createPipeline() begin
 var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError'];
@@ -398,13 +399,91 @@ Pipeline.prototype.getValve = function() {
 };
 //Pipeline::getValve() end
 
-//Pipeline::registerSinkCallback() begin
+//Pipeline::registerSinkListener() begin
+var ValidRegisterSinkListenerExceptions = [
+    'InvalidValuesError',
+    'NotFoundError',
+    'NotSupportedError',
+    'TypeMismatchError',
+    'AbortError'
+];
+Pipeline.prototype.registerSinkListener = function() {
+    var args = validator_.validateArgs(arguments, [
+        {
+            name: 'name',
+            type: validator_.Types.STRING
+        },
+        {
+            name: 'sinkListener',
+            type: types_.FUNCTION
+        }
+    ]);
+
+    var nativeArgs = {
+        id: this._id,
+        name: args.name,
+        listenerName: kSinkListenerNamePrefix + args.name
+    };
 
-//Pipeline::registerSinkCallback() end
+    var sinkListener = function(msg) {
+        var sinkData = new TensorsData(msg.tensorsDataId, msg.tensorsInfoId);
+        args.sinkListener(args.name, sinkData);
+    };
+    native_.addListener(nativeArgs.listenerName, sinkListener);
 
-//Pipeline::unregisterSinkCallback() begin
+    var result = native_.callSync('MLPipelineRegisterSinkListener', nativeArgs);
+    if (native_.isFailure(result)) {
+        native_.removeListener(nativeArgs.listenerName);
 
-//Pipeline::unregisterSinkCallback() end
+        throw native_.getErrorObjectAndValidate(
+            result,
+            ValidRegisterSinkListenerExceptions,
+            AbortError
+        );
+    }
+};
+//Pipeline::registerSinkListener() end
+
+//Pipeline::unregisterSinkListener() begin
+var ValidUnregisterSinkListenerExceptions = [
+    'InvalidValuesError',
+    'NotFoundError',
+    'NotSupportedError',
+    'AbortError'
+];
+Pipeline.prototype.unregisterSinkListener = function() {
+    var args = validator_.validateArgs(arguments, [
+        {
+            name: 'name',
+            type: validator_.Types.STRING
+        }
+    ]);
+
+    if (!args.has.name) {
+        throw new WebAPIException(
+            WebAPIException.INVALID_VALUES_ERR,
+            'Invalid parameter: sink name is mandatory'
+        );
+    }
+
+    var nativeArgs = {
+        id: this._id,
+        name: args.name
+    };
+
+    var result = native_.callSync('MLPipelineUnregisterSinkListener', nativeArgs);
+    if (native_.isFailure(result)) {
+        throw native_.getErrorObjectAndValidate(
+            result,
+            ValidUnregisterSinkListenerExceptions,
+            AbortError
+        );
+    }
+
+    var listenerName = kSinkListenerNamePrefix + args.name;
+    native_.removeListener(listenerName);
+};
+//Pipeline::unregisterSinkListener() end
 
 //Pipeline::registerCustomFilter() begin
 
index 0b0855338cad283163cdfd9555da910c3004905e..738bdd291a74a2d183636ad74d2d1b7b6b815ddd 100644 (file)
@@ -21,6 +21,8 @@
         'ml_pipeline_manager.h',
         'ml_pipeline_nodeinfo.cc',
         'ml_pipeline_nodeinfo.h',
+        'ml_pipeline_sink.cc',
+        'ml_pipeline_sink.h',
         'ml_pipeline_switch.cc',
         'ml_pipeline_switch.h',
         'ml_pipeline_source.h',
index 20c6ec8235153af6ac1ed7fa375718f293e7f154..623269cde8942fd3f8605f9156bcaf59e1340363 100644 (file)
@@ -52,6 +52,7 @@ const std::string kBuffer = "buffer";
 const std::string kSize = "size";
 const std::string kLocation = "location";
 const std::string kShape = "shape";
+const std::string kListenerName = "listenerName";
 }  //  namespace
 
 using namespace common;
@@ -144,6 +145,8 @@ MlInstance::MlInstance()
   REGISTER_METHOD(MLPipelineGetSource);
   REGISTER_METHOD(MLPipelineGetInputTensorsInfo);
   REGISTER_METHOD(MLPipelineSourceInputData);
+  REGISTER_METHOD(MLPipelineRegisterSinkListener);
+  REGISTER_METHOD(MLPipelineUnregisterSinkListener);
 // Pipeline API end
 
 #undef REGISTER_METHOD
@@ -1085,11 +1088,47 @@ void MlInstance::MLPipelineGetValve(const picojson::value& args, picojson::objec
 // Pipeline::getValve() end
 
 // Pipeline::registerSinkCallback() begin
+void MlInstance::MLPipelineRegisterSinkListener(const picojson::value& args,
+                                                picojson::object& out) {
+  ScopeLogger("args: %s", args.serialize().c_str());
+
+  CHECK_ARGS(args, kId, double, out);
+  CHECK_ARGS(args, kName, std::string, out);
+  CHECK_ARGS(args, kListenerName, std::string, out);
 
+  auto pipeline_id = static_cast<int>(args.get(kId).get<double>());
+  auto sink_name = args.get(kName).get<std::string>();
+  auto listener_name = args.get(kListenerName).get<std::string>();
+
+  auto ret = pipeline_manager_.RegisterSinkListener(sink_name, pipeline_id, listener_name);
+  if (!ret) {
+    LogAndReportError(ret, &out);
+    return;
+  }
+
+  ReportSuccess(out);
+}
 // Pipeline::registerSinkCallback() end
 
 // Pipeline::unregisterSinkCallback() begin
+void MlInstance::MLPipelineUnregisterSinkListener(const picojson::value& args,
+                                                  picojson::object& out) {
+  ScopeLogger("args: %s", args.serialize().c_str());
+
+  CHECK_ARGS(args, kId, double, out);
+  CHECK_ARGS(args, kName, std::string, out);
 
+  auto pipeline_id = static_cast<int>(args.get(kId).get<double>());
+  auto sink_name = args.get(kName).get<std::string>();
+
+  auto ret = pipeline_manager_.UnregisterSinkListener(sink_name, pipeline_id);
+  if (!ret) {
+    LogAndReportError(ret, &out);
+    return;
+  }
+
+  ReportSuccess(out);
+}
 // Pipeline::unregisterSinkCallback() end
 
 // Pipeline::registerCustomFilter() begin
index 36ff883424dc953c0f142208c389116812301e2a..e905bfde0addaa9e971cabe318d11c992f6f036a 100644 (file)
@@ -118,11 +118,11 @@ class MlInstance : public common::ParsedInstance {
   // Pipeline::getValve() end
 
   // Pipeline::registerSinkCallback() begin
-
+  void MLPipelineRegisterSinkListener(const picojson::value& args, picojson::object& out);
   // Pipeline::registerSinkCallback() end
 
   // Pipeline::unregisterSinkCallback() begin
-
+  void MLPipelineUnregisterSinkListener(const picojson::value& args, picojson::object& out);
   // Pipeline::unregisterSinkCallback() end
 
   // Pipeline::registerCustomFilter() begin
index 22928440f6bad16b1875ffa009c6e634ac142070..f3d408f59314bb9b5c4ccd4bddc58aff9ccca680 100644 (file)
@@ -18,6 +18,7 @@
 #include "common/logger.h"
 #include "common/picojson.h"
 #include "ml_pipeline.h"
+#include "ml_pipeline_sink.h"
 #include "ml_utils.h"
 
 using common::PlatformResult;
@@ -63,10 +64,11 @@ namespace extension {
 namespace ml {
 
 Pipeline::Pipeline(int id, const std::string& state_change_listener_name,
-                   common::Instance* instance_ptr)
+                   TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr)
     : id_{id},
       pipeline_{nullptr},  // this will be set to a proper pointer in CreatePipeline()
       state_change_listener_name_{state_change_listener_name},
+      tensors_info_manager_ptr_{tensors_info_manager_ptr},
       instance_ptr_{instance_ptr} {
   ScopeLogger("id: [%d], state_change_listener_name: [%s]", id, state_change_listener_name.c_str());
 }
@@ -75,6 +77,7 @@ Pipeline::Pipeline(int id, const std::string& state_change_listener_name,
 PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition,
                                         const std::string& state_change_listener_name,
                                         common::Instance* instance_ptr,
+                                        TensorsInfoManager* tensors_info_manager_ptr,
                                         std::unique_ptr<Pipeline>* out) {
   ScopeLogger("id: [%d], definition: [%s], state_change_listener_name: [%s]", id,
               definition.c_str(), state_change_listener_name.c_str());
@@ -83,8 +86,8 @@ PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition,
    * because Pipeline is the user data for the listener registered by
    * ml_pipeline_construct().
    */
-  std::unique_ptr<Pipeline> pipeline_ptr{
-      new (std::nothrow) Pipeline{id, state_change_listener_name, instance_ptr}};
+  std::unique_ptr<Pipeline> pipeline_ptr{new (std::nothrow) Pipeline{
+      id, state_change_listener_name, tensors_info_manager_ptr, instance_ptr}};
   if (!pipeline_ptr) {
     return LogAndCreateResult(ErrorCode::ABORT_ERR, "An unknown occurred.",
                               ("Could not allocate memory for Pipeline"));
@@ -196,6 +199,8 @@ PlatformResult Pipeline::Dispose() {
 
   valves_.clear();
 
+  sinks_.clear();
+
   sources_.clear();
 
   auto ret = ml_pipeline_destroy(pipeline_);
@@ -296,11 +301,45 @@ PlatformResult Pipeline::GetValve(const std::string& name) {
 // Pipeline::getValve() end
 
 // Pipeline::registerSinkCallback() begin
+PlatformResult Pipeline::RegisterSinkListener(const std::string& sink_name,
+                                              const std::string& listener_name) {
+  ScopeLogger("sink_name: [%s], listener_name: [%s], id_: [%d]", sink_name.c_str(),
+              listener_name.c_str(), id_);
+
+  if (sinks_.count(sink_name)) {
+    LoggerD("Listener for [%s] sink is already registered", sink_name.c_str());
+    return PlatformResult{};
+  }
+
+  std::unique_ptr<Sink> sink_ptr;
+  auto ret = Sink::CreateAndRegisterSink(sink_name, listener_name, pipeline_, instance_ptr_,
+                                         tensors_info_manager_ptr_, &sink_ptr);
+  if (!ret) {
+    return ret;
+  }
+
+  sinks_.insert({sink_name, std::move(sink_ptr)});
 
+  return PlatformResult{};
+}
 // Pipeline::registerSinkCallback() end
 
 // Pipeline::unregisterSinkCallback() begin
+PlatformResult Pipeline::UnregisterSinkListener(const std::string& sink_name) {
+  ScopeLogger("sink_name: [%s], id_: [%d]", sink_name.c_str(), id_);
+
+  auto sink_it = sinks_.find(sink_name);
+  if (sinks_.end() == sink_it) {
+    LoggerD("sink [%s] not found", sink_name.c_str());
+    return PlatformResult{ErrorCode::ABORT_ERR, "An unknown error occurred"};
+  }
 
+  auto ret = sink_it->second->Unregister();
+  if (ret) {
+    sinks_.erase(sink_it);
+  }
+  return ret;
+}
 // Pipeline::unregisterSinkCallback() end
 
 // Pipeline::registerCustomFilter() begin
index 49b681ee942614d409baf7d97dc104af066c2e49..c529e6bc7ff6697e44c9285e2f67524d1d6954fc 100644 (file)
@@ -27,6 +27,7 @@
 #include "common/picojson.h"
 #include "common/platform_result.h"
 #include "ml_pipeline_nodeinfo.h"
+#include "ml_pipeline_sink.h"
 #include "ml_pipeline_source.h"
 #include "ml_pipeline_switch.h"
 #include "ml_pipeline_valve.h"
@@ -49,6 +50,7 @@ class Pipeline {
   static PlatformResult CreatePipeline(int id, const std::string& definition,
                                        const std::string& state_change_listener_name,
                                        common::Instance* instance_ptr,
+                                       TensorsInfoManager* tensors_info_manager_ptr,
                                        std::unique_ptr<Pipeline>* out);
   // PipelineManager::createPipeline() end
   Pipeline() = delete;
@@ -90,11 +92,12 @@ class Pipeline {
   // Pipeline::getValve() end
 
   // Pipeline::registerSinkCallback() begin
-
+  PlatformResult RegisterSinkListener(const std::string& sink_name,
+                                      const std::string& listener_name);
   // Pipeline::registerSinkCallback() end
 
   // Pipeline::unregisterSinkCallback() begin
-
+  PlatformResult UnregisterSinkListener(const std::string& sink_name);
   // Pipeline::unregisterSinkCallback() end
 
   // Pipeline::registerCustomFilter() begin
@@ -133,11 +136,13 @@ class Pipeline {
   PlatformResult GetValve(const std::string& name, Valve** out);
   // Valve::setOpen() end
  private:
-  Pipeline(int id, const std::string& state_change_listener_name, common::Instance* instance_ptr);
+  Pipeline(int id, const std::string& state_change_listener_name,
+           TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr);
 
   const int id_;
   ml_pipeline_h pipeline_;
   const std::string state_change_listener_name_;
+  TensorsInfoManager* tensors_info_manager_ptr_;
   common::Instance* instance_ptr_;
 
   /* ######### VERY IMPORTANT #########
@@ -155,6 +160,7 @@ class Pipeline {
   std::unordered_map<std::string, std::unique_ptr<Switch>> switches_;
   std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
   std::unordered_map<std::string, std::unique_ptr<Valve>> valves_;
+  std::unordered_map<std::string, std::unique_ptr<Sink>> sinks_;
   std::map<std::string, std::unique_ptr<Source>> sources_;
 
   static void PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data);
index 7d898710313e9315d16066ab4c4460efef8e8a57..c2c3e1a879fd01033717f6a3116496f62df27c1e 100644 (file)
@@ -49,7 +49,7 @@ PlatformResult PipelineManager::CreatePipeline(int id, const std::string& defini
 
   std::unique_ptr<Pipeline> pipeline_ptr;
   auto ret = Pipeline::CreatePipeline(id, definition, state_change_listener_name, instance_ptr_,
-                                      &pipeline_ptr);
+                                      tensors_info_manager_, &pipeline_ptr);
   if (!ret) {
     return ret;
   }
@@ -189,11 +189,34 @@ PlatformResult PipelineManager::GetValve(const std::string& name, int pipeline_i
 // Pipeline::getValve() end
 
 // Pipeline::registerSinkCallback() begin
+PlatformResult PipelineManager::RegisterSinkListener(const std::string& sink_name, int pipeline_id,
+                                                     const std::string& listener_name) {
+  ScopeLogger("sink_name: [%s], pipeline_id: [%d], listener_name: [%s]", sink_name.c_str(),
+              pipeline_id, listener_name.c_str());
 
+  auto pipeline_it = pipelines_.find(pipeline_id);
+  if (pipelines_.end() == pipeline_it) {
+    LoggerD("Pipeline not found: [%d]", pipeline_id);
+    return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"};
+  }
+
+  return pipeline_it->second->RegisterSinkListener(sink_name, listener_name);
+}
 // Pipeline::registerSinkCallback() end
 
 // Pipeline::unregisterSinkCallback() begin
+PlatformResult PipelineManager::UnregisterSinkListener(const std::string& sink_name,
+                                                       int pipeline_id) {
+  ScopeLogger("sink_name: [%s], pipeline_id: [%d]", sink_name.c_str(), pipeline_id);
 
+  auto pipeline_it = pipelines_.find(pipeline_id);
+  if (pipelines_.end() == pipeline_it) {
+    LoggerD("Pipeline not found: [%d]", pipeline_id);
+    return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"};
+  }
+
+  return pipeline_it->second->UnregisterSinkListener(sink_name);
+}
 // Pipeline::unregisterSinkCallback() end
 
 // Pipeline::registerCustomFilter() begin
index b76d3a6629022f86315d94fc9eb2eb5f427ddf14..a2e6fa9c7cd42492108016c4981ffbdd0fc434bb 100644 (file)
@@ -78,11 +78,12 @@ class PipelineManager {
   // Pipeline::getValve() end
 
   // Pipeline::registerSinkCallback() begin
-
+  PlatformResult RegisterSinkListener(const std::string& sink_name, int pipeline_id,
+                                      const std::string& listener_name);
   // Pipeline::registerSinkCallback() end
 
   // Pipeline::unregisterSinkCallback() begin
-
+  PlatformResult UnregisterSinkListener(const std::string& sink_name, int pipeline_id);
   // Pipeline::unregisterSinkCallback() end
 
   // Pipeline::registerCustomFilter() begin
diff --git a/src/ml/ml_pipeline_sink.cc b/src/ml/ml_pipeline_sink.cc
new file mode 100644 (file)
index 0000000..d5a1420
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#include <utility>
+
+#include "ml_pipeline_sink.h"
+#include "ml_utils.h"
+
+using common::PlatformResult;
+using common::ErrorCode;
+
+namespace {
+
+const std::string kListenerId = "listenerId";
+const std::string kTensorsDataId = "tensorsDataId";
+const std::string kTensorsInfoId = "tensorsInfoId";
+
+}  //  namespace
+
+namespace extension {
+namespace ml {
+namespace pipeline {
+
+PlatformResult Sink::CreateAndRegisterSink(const std::string& name,
+                                           const std::string& listener_name, ml_pipeline_h pipeline,
+                                           common::Instance* instance_ptr,
+                                           TensorsInfoManager* tensors_info_manager_ptr,
+                                           std::unique_ptr<Sink>* out) {
+  ScopeLogger(
+      "name: [%s], listener_name: [%s], pipeline: [%p], instance_ptr: [%p], "
+      "tensors_info_manager_ptr: [%p]",
+      name.c_str(), listener_name.c_str(), pipeline, instance_ptr, tensors_info_manager_ptr);
+
+  auto sink_ptr = std::unique_ptr<Sink>(
+      new (std::nothrow) Sink{name, listener_name, instance_ptr, tensors_info_manager_ptr});
+  if (!sink_ptr) {
+    return LogAndCreateResult(ErrorCode::ABORT_ERR, "Could not register sink listener",
+                              ("Could not allocate memory"));
+  }
+
+  ml_pipeline_sink_h sink_handle = nullptr;
+  auto ret = ml_pipeline_sink_register(pipeline, name.c_str(), SinkListener,
+                                       static_cast<void*>(sink_ptr.get()), &sink_handle);
+  if (ML_ERROR_NONE != ret) {
+    LoggerE("ml_pipeline_sink_register() failed: [%d] (%s)", ret, get_error_message(ret));
+    return util::ToPlatformResult(ret, "Could not register sink listener");
+  }
+  LoggerD("ml_pipeline_sink_register() succeeded");
+  sink_ptr->sink_ = sink_handle;
+
+  *out = std::move(sink_ptr);
+
+  return PlatformResult{};
+}
+
+Sink::Sink(const std::string& name, const std::string& listener_name,
+           common::Instance* instance_ptr, TensorsInfoManager* tensors_info_manager_ptr)
+    : name_{name},
+      listener_name_{listener_name},
+      sink_{nullptr},
+      instance_ptr_{instance_ptr},
+      tensors_info_manager_ptr_{tensors_info_manager_ptr} {
+  ScopeLogger(
+      "name: [%s]_, listener_name: [%s]_, instance_ptr_: [%p], tensors_info_manager_ptr_: [%p]",
+      name.c_str(), listener_name.c_str(), instance_ptr_, tensors_info_manager_ptr_);
+}
+
+Sink::~Sink() {
+  ScopeLogger("name: [%s]_, listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(),
+              listener_name_.c_str(), sink_, instance_ptr_);
+
+  Unregister();
+}
+
+void Sink::SinkListener(ml_tensors_data_h tensors_data_src, ml_tensors_info_h tensors_info_src,
+                        void* user_data) {
+  ScopeLogger("tensors_data_src: [%p], tensors_info_src: [%p], user_data: [%p]", tensors_data_src,
+              tensors_info_src, user_data);
+
+  if (!user_data) {
+    LoggerE("user_data is a nullptr");
+    return;
+  }
+
+  Sink* sink_ptr = static_cast<Sink*>(user_data);
+
+  auto* tensors_data_ptr = sink_ptr->tensors_info_manager_ptr_->CloneNativeTensorWithData(
+      tensors_info_src, tensors_data_src);
+  if (!tensors_data_ptr) {
+    LoggerE("Could not clone tensors data. Sink listener won't be triggered.");
+    return;
+  }
+
+  picojson::value response{picojson::object{}};
+  auto& response_obj = response.get<picojson::object>();
+  response_obj[kListenerId] = picojson::value{sink_ptr->listener_name_};
+  response_obj[kTensorsDataId] = picojson::value{static_cast<double>(tensors_data_ptr->Id())};
+  response_obj[kTensorsInfoId] =
+      picojson::value{static_cast<double>(tensors_data_ptr->TensorsInfoId())};
+
+  common::Instance::PostMessage(sink_ptr->instance_ptr_, response);
+}
+
+PlatformResult Sink::Unregister() {
+  ScopeLogger("name_: [%s], listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(),
+              listener_name_.c_str(), sink_, instance_ptr_);
+
+  if (!sink_) {
+    LoggerD("Sink was already unregistered");
+    return PlatformResult{};
+  }
+
+  auto ret = ml_pipeline_sink_unregister(sink_);
+  if (ML_ERROR_NONE != ret) {
+    LoggerE("ml_pipeline_sink_unregister() failed: [%d] (%s)", ret, get_error_message(ret));
+    return util::ToPlatformResult(ret, "Could not unregister sink");
+  }
+  LoggerD("ml_pipeline_sink_unregister() succeeded");
+
+  sink_ = nullptr;
+
+  return PlatformResult{};
+}
+
+}  // namespace pipeline
+}  // namespace ml
+}  // namespace extension
\ No newline at end of file
diff --git a/src/ml/ml_pipeline_sink.h b/src/ml/ml_pipeline_sink.h
new file mode 100644 (file)
index 0000000..b7970c5
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifndef ML_ML_PIPELINE_SINK_H_
+#define ML_ML_PIPELINE_SINK_H_
+
+#include <memory>
+#include <string>
+
+#include <nnstreamer/nnstreamer.h>
+
+#include "common/extension.h"
+#include "common/platform_result.h"
+
+#include "ml_tensors_data_manager.h"
+#include "ml_tensors_info_manager.h"
+
+using common::PlatformResult;
+
+namespace extension {
+namespace ml {
+namespace pipeline {
+
+class Sink {
+ public:
+  static PlatformResult CreateAndRegisterSink(const std::string& name,
+                                              const std::string& listener_name,
+                                              ml_pipeline_h pipeline,
+                                              common::Instance* instance_ptr,
+                                              TensorsInfoManager* tensors_info_manager_ptr,
+                                              std::unique_ptr<Sink>* out);
+
+  ~Sink();
+
+  PlatformResult Unregister();
+
+  Sink(const Sink&) = delete;
+  Sink& operator=(const Sink&) = delete;
+
+ private:
+  Sink(const std::string& name, const std::string& listener_name, common::Instance* instance_ptr,
+       TensorsInfoManager* tensors_info_manager_pt);
+
+  static void SinkListener(ml_tensors_data_h tensors_data, ml_tensors_info_h tensors_info,
+                           void* user_data);
+
+  const std::string name_;
+  const std::string listener_name_;
+  ml_pipeline_sink_h sink_;
+  common::Instance* instance_ptr_;
+  TensorsInfoManager* tensors_info_manager_ptr_;
+};
+
+}  // namespace pipeline
+}  // namespace ml
+}  // namespace extension
+
+#endif  // ML_ML_PIPELINE_SINK_H_