*/
var kPipelineStateChangeListenerNamePrefix = 'MLPipelineStateChangeListener';
+var kSinkListenerNamePrefix = 'MLPipelineSinkListener';
//PipelineManager::createPipeline() begin
var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError'];
};
//Pipeline::getValve() end
-//Pipeline::registerSinkCallback() begin
+//Pipeline::registerSinkListener() begin
+var ValidRegisterSinkListenerExceptions = [
+ 'InvalidValuesError',
+ 'NotFoundError',
+ 'NotSupportedError',
+ 'TypeMismatchError',
+ 'AbortError'
+];
+Pipeline.prototype.registerSinkListener = function() {
+ var args = validator_.validateArgs(arguments, [
+ {
+ name: 'name',
+ type: validator_.Types.STRING
+ },
+ {
+ name: 'sinkListener',
+ type: types_.FUNCTION
+ }
+ ]);
+
+ var nativeArgs = {
+ id: this._id,
+ name: args.name,
+ listenerName: kSinkListenerNamePrefix + args.name
+ };
-//Pipeline::registerSinkCallback() end
+ var sinkListener = function(msg) {
+ var sinkData = new TensorsData(msg.tensorsDataId, msg.tensorsInfoId);
+ args.sinkListener(args.name, sinkData);
+ };
+ native_.addListener(nativeArgs.listenerName, sinkListener);
-//Pipeline::unregisterSinkCallback() begin
+ var result = native_.callSync('MLPipelineRegisterSinkListener', nativeArgs);
+ if (native_.isFailure(result)) {
+ native_.removeListener(nativeArgs.listenerName);
-//Pipeline::unregisterSinkCallback() end
+ throw native_.getErrorObjectAndValidate(
+ result,
+ ValidRegisterSinkListenerExceptions,
+ AbortError
+ );
+ }
+};
+//Pipeline::registerSinkListener() end
+
+//Pipeline::unregisterSinkListener() begin
+var ValidUnregisterSinkListenerExceptions = [
+ 'InvalidValuesError',
+ 'NotFoundError',
+ 'NotSupportedError',
+ 'AbortError'
+];
+Pipeline.prototype.unregisterSinkListener = function() {
+ var args = validator_.validateArgs(arguments, [
+ {
+ name: 'name',
+ type: validator_.Types.STRING
+ }
+ ]);
+
+ if (!args.has.name) {
+ throw new WebAPIException(
+ WebAPIException.INVALID_VALUES_ERR,
+ 'Invalid parameter: sink name is mandatory'
+ );
+ }
+
+ var nativeArgs = {
+ id: this._id,
+ name: args.name
+ };
+
+ var result = native_.callSync('MLPipelineUnregisterSinkListener', nativeArgs);
+ if (native_.isFailure(result)) {
+ throw native_.getErrorObjectAndValidate(
+ result,
+ ValidUnregisterSinkListenerExceptions,
+ AbortError
+ );
+ }
+
+ var listenerName = kSinkListenerNamePrefix + args.name;
+ native_.removeListener(listenerName);
+};
+//Pipeline::unregisterSinkListener() end
//Pipeline::registerCustomFilter() begin
'ml_pipeline_manager.h',
'ml_pipeline_nodeinfo.cc',
'ml_pipeline_nodeinfo.h',
+ 'ml_pipeline_sink.cc',
+ 'ml_pipeline_sink.h',
'ml_pipeline_switch.cc',
'ml_pipeline_switch.h',
'ml_pipeline_source.h',
const std::string kSize = "size";
const std::string kLocation = "location";
const std::string kShape = "shape";
+const std::string kListenerName = "listenerName";
} // namespace
using namespace common;
REGISTER_METHOD(MLPipelineGetSource);
REGISTER_METHOD(MLPipelineGetInputTensorsInfo);
REGISTER_METHOD(MLPipelineSourceInputData);
+ REGISTER_METHOD(MLPipelineRegisterSinkListener);
+ REGISTER_METHOD(MLPipelineUnregisterSinkListener);
// Pipeline API end
#undef REGISTER_METHOD
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
+void MlInstance::MLPipelineRegisterSinkListener(const picojson::value& args,
+ picojson::object& out) {
+ ScopeLogger("args: %s", args.serialize().c_str());
+
+ CHECK_ARGS(args, kId, double, out);
+ CHECK_ARGS(args, kName, std::string, out);
+ CHECK_ARGS(args, kListenerName, std::string, out);
+ auto pipeline_id = static_cast<int>(args.get(kId).get<double>());
+ auto sink_name = args.get(kName).get<std::string>();
+ auto listener_name = args.get(kListenerName).get<std::string>();
+
+ auto ret = pipeline_manager_.RegisterSinkListener(sink_name, pipeline_id, listener_name);
+ if (!ret) {
+ LogAndReportError(ret, &out);
+ return;
+ }
+
+ ReportSuccess(out);
+}
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
+void MlInstance::MLPipelineUnregisterSinkListener(const picojson::value& args,
+ picojson::object& out) {
+ ScopeLogger("args: %s", args.serialize().c_str());
+
+ CHECK_ARGS(args, kId, double, out);
+ CHECK_ARGS(args, kName, std::string, out);
+ auto pipeline_id = static_cast<int>(args.get(kId).get<double>());
+ auto sink_name = args.get(kName).get<std::string>();
+
+ auto ret = pipeline_manager_.UnregisterSinkListener(sink_name, pipeline_id);
+ if (!ret) {
+ LogAndReportError(ret, &out);
+ return;
+ }
+
+ ReportSuccess(out);
+}
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
-
+ void MLPipelineRegisterSinkListener(const picojson::value& args, picojson::object& out);
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
-
+ void MLPipelineUnregisterSinkListener(const picojson::value& args, picojson::object& out);
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
#include "common/logger.h"
#include "common/picojson.h"
#include "ml_pipeline.h"
+#include "ml_pipeline_sink.h"
#include "ml_utils.h"
using common::PlatformResult;
namespace ml {
Pipeline::Pipeline(int id, const std::string& state_change_listener_name,
- common::Instance* instance_ptr)
+ TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr)
: id_{id},
pipeline_{nullptr}, // this will be set to a proper pointer in CreatePipeline()
state_change_listener_name_{state_change_listener_name},
+ tensors_info_manager_ptr_{tensors_info_manager_ptr},
instance_ptr_{instance_ptr} {
ScopeLogger("id: [%d], state_change_listener_name: [%s]", id, state_change_listener_name.c_str());
}
PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition,
const std::string& state_change_listener_name,
common::Instance* instance_ptr,
+ TensorsInfoManager* tensors_info_manager_ptr,
std::unique_ptr<Pipeline>* out) {
ScopeLogger("id: [%d], definition: [%s], state_change_listener_name: [%s]", id,
definition.c_str(), state_change_listener_name.c_str());
* because Pipeline is the user data for the listener registered by
* ml_pipeline_construct().
*/
- std::unique_ptr<Pipeline> pipeline_ptr{
- new (std::nothrow) Pipeline{id, state_change_listener_name, instance_ptr}};
+ std::unique_ptr<Pipeline> pipeline_ptr{new (std::nothrow) Pipeline{
+ id, state_change_listener_name, tensors_info_manager_ptr, instance_ptr}};
if (!pipeline_ptr) {
return LogAndCreateResult(ErrorCode::ABORT_ERR, "An unknown occurred.",
("Could not allocate memory for Pipeline"));
valves_.clear();
+ sinks_.clear();
+
sources_.clear();
auto ret = ml_pipeline_destroy(pipeline_);
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
+PlatformResult Pipeline::RegisterSinkListener(const std::string& sink_name,
+ const std::string& listener_name) {
+ ScopeLogger("sink_name: [%s], listener_name: [%s], id_: [%d]", sink_name.c_str(),
+ listener_name.c_str(), id_);
+
+ if (sinks_.count(sink_name)) {
+ LoggerD("Listener for [%s] sink is already registered", sink_name.c_str());
+ return PlatformResult{};
+ }
+
+ std::unique_ptr<Sink> sink_ptr;
+ auto ret = Sink::CreateAndRegisterSink(sink_name, listener_name, pipeline_, instance_ptr_,
+ tensors_info_manager_ptr_, &sink_ptr);
+ if (!ret) {
+ return ret;
+ }
+
+ sinks_.insert({sink_name, std::move(sink_ptr)});
+ return PlatformResult{};
+}
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
+PlatformResult Pipeline::UnregisterSinkListener(const std::string& sink_name) {
+ ScopeLogger("sink_name: [%s], id_: [%d]", sink_name.c_str(), id_);
+
+ auto sink_it = sinks_.find(sink_name);
+ if (sinks_.end() == sink_it) {
+ LoggerD("sink [%s] not found", sink_name.c_str());
+ return PlatformResult{ErrorCode::ABORT_ERR, "An unknown error occurred"};
+ }
+ auto ret = sink_it->second->Unregister();
+ if (ret) {
+ sinks_.erase(sink_it);
+ }
+ return ret;
+}
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
#include "common/picojson.h"
#include "common/platform_result.h"
#include "ml_pipeline_nodeinfo.h"
+#include "ml_pipeline_sink.h"
#include "ml_pipeline_source.h"
#include "ml_pipeline_switch.h"
#include "ml_pipeline_valve.h"
static PlatformResult CreatePipeline(int id, const std::string& definition,
const std::string& state_change_listener_name,
common::Instance* instance_ptr,
+ TensorsInfoManager* tensors_info_manager_ptr,
std::unique_ptr<Pipeline>* out);
// PipelineManager::createPipeline() end
Pipeline() = delete;
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
-
+ PlatformResult RegisterSinkListener(const std::string& sink_name,
+ const std::string& listener_name);
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
-
+ PlatformResult UnregisterSinkListener(const std::string& sink_name);
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
PlatformResult GetValve(const std::string& name, Valve** out);
// Valve::setOpen() end
private:
- Pipeline(int id, const std::string& state_change_listener_name, common::Instance* instance_ptr);
+ Pipeline(int id, const std::string& state_change_listener_name,
+ TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr);
const int id_;
ml_pipeline_h pipeline_;
const std::string state_change_listener_name_;
+ TensorsInfoManager* tensors_info_manager_ptr_;
common::Instance* instance_ptr_;
/* ######### VERY IMPORTANT #########
std::unordered_map<std::string, std::unique_ptr<Switch>> switches_;
std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
std::unordered_map<std::string, std::unique_ptr<Valve>> valves_;
+ std::unordered_map<std::string, std::unique_ptr<Sink>> sinks_;
std::map<std::string, std::unique_ptr<Source>> sources_;
static void PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data);
std::unique_ptr<Pipeline> pipeline_ptr;
auto ret = Pipeline::CreatePipeline(id, definition, state_change_listener_name, instance_ptr_,
- &pipeline_ptr);
+ tensors_info_manager_, &pipeline_ptr);
if (!ret) {
return ret;
}
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
+PlatformResult PipelineManager::RegisterSinkListener(const std::string& sink_name, int pipeline_id,
+ const std::string& listener_name) {
+ ScopeLogger("sink_name: [%s], pipeline_id: [%d], listener_name: [%s]", sink_name.c_str(),
+ pipeline_id, listener_name.c_str());
+ auto pipeline_it = pipelines_.find(pipeline_id);
+ if (pipelines_.end() == pipeline_it) {
+ LoggerD("Pipeline not found: [%d]", pipeline_id);
+ return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"};
+ }
+
+ return pipeline_it->second->RegisterSinkListener(sink_name, listener_name);
+}
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
+PlatformResult PipelineManager::UnregisterSinkListener(const std::string& sink_name,
+ int pipeline_id) {
+ ScopeLogger("sink_name: [%s], pipeline_id: [%d]", sink_name.c_str(), pipeline_id);
+ auto pipeline_it = pipelines_.find(pipeline_id);
+ if (pipelines_.end() == pipeline_it) {
+ LoggerD("Pipeline not found: [%d]", pipeline_id);
+ return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Pipeline not found"};
+ }
+
+ return pipeline_it->second->UnregisterSinkListener(sink_name);
+}
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
// Pipeline::getValve() end
// Pipeline::registerSinkCallback() begin
-
+ PlatformResult RegisterSinkListener(const std::string& sink_name, int pipeline_id,
+ const std::string& listener_name);
// Pipeline::registerSinkCallback() end
// Pipeline::unregisterSinkCallback() begin
-
+ PlatformResult UnregisterSinkListener(const std::string& sink_name, int pipeline_id);
// Pipeline::unregisterSinkCallback() end
// Pipeline::registerCustomFilter() begin
--- /dev/null
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <utility>
+
+#include "ml_pipeline_sink.h"
+#include "ml_utils.h"
+
+using common::PlatformResult;
+using common::ErrorCode;
+
+namespace {
+
+const std::string kListenerId = "listenerId";
+const std::string kTensorsDataId = "tensorsDataId";
+const std::string kTensorsInfoId = "tensorsInfoId";
+
+} // namespace
+
+namespace extension {
+namespace ml {
+namespace pipeline {
+
+PlatformResult Sink::CreateAndRegisterSink(const std::string& name,
+ const std::string& listener_name, ml_pipeline_h pipeline,
+ common::Instance* instance_ptr,
+ TensorsInfoManager* tensors_info_manager_ptr,
+ std::unique_ptr<Sink>* out) {
+ ScopeLogger(
+ "name: [%s], listener_name: [%s], pipeline: [%p], instance_ptr: [%p], "
+ "tensors_info_manager_ptr: [%p]",
+ name.c_str(), listener_name.c_str(), pipeline, instance_ptr, tensors_info_manager_ptr);
+
+ auto sink_ptr = std::unique_ptr<Sink>(
+ new (std::nothrow) Sink{name, listener_name, instance_ptr, tensors_info_manager_ptr});
+ if (!sink_ptr) {
+ return LogAndCreateResult(ErrorCode::ABORT_ERR, "Could not register sink listener",
+ ("Could not allocate memory"));
+ }
+
+ ml_pipeline_sink_h sink_handle = nullptr;
+ auto ret = ml_pipeline_sink_register(pipeline, name.c_str(), SinkListener,
+ static_cast<void*>(sink_ptr.get()), &sink_handle);
+ if (ML_ERROR_NONE != ret) {
+ LoggerE("ml_pipeline_sink_register() failed: [%d] (%s)", ret, get_error_message(ret));
+ return util::ToPlatformResult(ret, "Could not register sink listener");
+ }
+ LoggerD("ml_pipeline_sink_register() succeeded");
+ sink_ptr->sink_ = sink_handle;
+
+ *out = std::move(sink_ptr);
+
+ return PlatformResult{};
+}
+
+Sink::Sink(const std::string& name, const std::string& listener_name,
+ common::Instance* instance_ptr, TensorsInfoManager* tensors_info_manager_ptr)
+ : name_{name},
+ listener_name_{listener_name},
+ sink_{nullptr},
+ instance_ptr_{instance_ptr},
+ tensors_info_manager_ptr_{tensors_info_manager_ptr} {
+ ScopeLogger(
+ "name: [%s]_, listener_name: [%s]_, instance_ptr_: [%p], tensors_info_manager_ptr_: [%p]",
+ name.c_str(), listener_name.c_str(), instance_ptr_, tensors_info_manager_ptr_);
+}
+
+Sink::~Sink() {
+ ScopeLogger("name: [%s]_, listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(),
+ listener_name_.c_str(), sink_, instance_ptr_);
+
+ Unregister();
+}
+
+void Sink::SinkListener(ml_tensors_data_h tensors_data_src, ml_tensors_info_h tensors_info_src,
+ void* user_data) {
+ ScopeLogger("tensors_data_src: [%p], tensors_info_src: [%p], user_data: [%p]", tensors_data_src,
+ tensors_info_src, user_data);
+
+ if (!user_data) {
+ LoggerE("user_data is a nullptr");
+ return;
+ }
+
+ Sink* sink_ptr = static_cast<Sink*>(user_data);
+
+ auto* tensors_data_ptr = sink_ptr->tensors_info_manager_ptr_->CloneNativeTensorWithData(
+ tensors_info_src, tensors_data_src);
+ if (!tensors_data_ptr) {
+ LoggerE("Could not clone tensors data. Sink listener won't be triggered.");
+ return;
+ }
+
+ picojson::value response{picojson::object{}};
+ auto& response_obj = response.get<picojson::object>();
+ response_obj[kListenerId] = picojson::value{sink_ptr->listener_name_};
+ response_obj[kTensorsDataId] = picojson::value{static_cast<double>(tensors_data_ptr->Id())};
+ response_obj[kTensorsInfoId] =
+ picojson::value{static_cast<double>(tensors_data_ptr->TensorsInfoId())};
+
+ common::Instance::PostMessage(sink_ptr->instance_ptr_, response);
+}
+
+PlatformResult Sink::Unregister() {
+ ScopeLogger("name_: [%s], listener_name_: [%s], sink_: [%p], instance_ptr_: [%p]", name_.c_str(),
+ listener_name_.c_str(), sink_, instance_ptr_);
+
+ if (!sink_) {
+ LoggerD("Sink was already unregistered");
+ return PlatformResult{};
+ }
+
+ auto ret = ml_pipeline_sink_unregister(sink_);
+ if (ML_ERROR_NONE != ret) {
+ LoggerE("ml_pipeline_sink_unregister() failed: [%d] (%s)", ret, get_error_message(ret));
+ return util::ToPlatformResult(ret, "Could not unregister sink");
+ }
+ LoggerD("ml_pipeline_sink_unregister() succeeded");
+
+ sink_ = nullptr;
+
+ return PlatformResult{};
+}
+
+} // namespace pipeline
+} // namespace ml
+} // namespace extension
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ML_ML_PIPELINE_SINK_H_
+#define ML_ML_PIPELINE_SINK_H_
+
+#include <memory>
+#include <string>
+
+#include <nnstreamer/nnstreamer.h>
+
+#include "common/extension.h"
+#include "common/platform_result.h"
+
+#include "ml_tensors_data_manager.h"
+#include "ml_tensors_info_manager.h"
+
+using common::PlatformResult;
+
+namespace extension {
+namespace ml {
+namespace pipeline {
+
+class Sink {
+ public:
+ static PlatformResult CreateAndRegisterSink(const std::string& name,
+ const std::string& listener_name,
+ ml_pipeline_h pipeline,
+ common::Instance* instance_ptr,
+ TensorsInfoManager* tensors_info_manager_ptr,
+ std::unique_ptr<Sink>* out);
+
+ ~Sink();
+
+ PlatformResult Unregister();
+
+ Sink(const Sink&) = delete;
+ Sink& operator=(const Sink&) = delete;
+
+ private:
+ Sink(const std::string& name, const std::string& listener_name, common::Instance* instance_ptr,
+ TensorsInfoManager* tensors_info_manager_pt);
+
+ static void SinkListener(ml_tensors_data_h tensors_data, ml_tensors_info_h tensors_info,
+ void* user_data);
+
+ const std::string name_;
+ const std::string listener_name_;
+ ml_pipeline_sink_h sink_;
+ common::Instance* instance_ptr_;
+ TensorsInfoManager* tensors_info_manager_ptr_;
+};
+
+} // namespace pipeline
+} // namespace ml
+} // namespace extension
+
+#endif // ML_ML_PIPELINE_SINK_H_