2 * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include "common/logger.h"
19 #include "common/picojson.h"
20 #include "ml_pipeline.h"
21 #include "ml_pipeline_sink.h"
24 using common::PlatformResult;
25 using common::ErrorCode;
29 const std::string kListenerId = "listenerId";
30 const std::string kState = "state";
32 std::string StateToString(ml_pipeline_state_e state) {
33 ScopeLogger("state: [%d]", state);
35 std::string state_str;
37 case ML_PIPELINE_STATE_UNKNOWN:
38 state_str = "UNKNOWN";
40 case ML_PIPELINE_STATE_NULL:
43 case ML_PIPELINE_STATE_READY:
46 case ML_PIPELINE_STATE_PAUSED:
49 case ML_PIPELINE_STATE_PLAYING:
50 state_str = "PLAYING";
53 LoggerE("Illegal ml_pipeline_state_e value: [%d]", state);
54 state_str = "UNKNOWN";
57 LoggerD("state_str: [%s]", state_str.c_str());
66 Pipeline::Pipeline(int id, const std::string& state_change_listener_name,
67 TensorsInfoManager* tensors_info_manager_ptr, common::Instance* instance_ptr)
69 pipeline_{nullptr}, // this will be set to a proper pointer in CreatePipeline()
70 state_change_listener_name_{state_change_listener_name},
71 tensors_info_manager_ptr_{tensors_info_manager_ptr},
72 instance_ptr_{instance_ptr} {
73 ScopeLogger("id: [%d], state_change_listener_name: [%s]", id, state_change_listener_name.c_str());
76 // Pipeline::createPipeline() begin
77 PlatformResult Pipeline::CreatePipeline(int id, const std::string& definition,
78 const std::string& state_change_listener_name,
79 common::Instance* instance_ptr,
80 TensorsInfoManager* tensors_info_manager_ptr,
81 std::unique_ptr<Pipeline>* out) {
82 ScopeLogger("id: [%d], definition: [%s], state_change_listener_name: [%s]", id,
83 definition.c_str(), state_change_listener_name.c_str());
85 /* We need to create the Pipeline object before setting its pipeline_ member,
86 * because Pipeline is the user data for the listener registered by
87 * ml_pipeline_construct().
89 std::unique_ptr<Pipeline> pipeline_ptr{new (std::nothrow) Pipeline{
90 id, state_change_listener_name, tensors_info_manager_ptr, instance_ptr}};
92 return LogAndCreateResult(ErrorCode::ABORT_ERR, "An unknown occurred.",
93 ("Could not allocate memory for Pipeline"));
96 int ret = ML_ERROR_UNKNOWN;
97 if (state_change_listener_name == "") {
98 ret = ml_pipeline_construct(definition.c_str(), nullptr, nullptr, &pipeline_ptr->pipeline_);
100 ret = ml_pipeline_construct(definition.c_str(), PipelineStateChangeListener,
101 static_cast<void*>(pipeline_ptr.get()), &pipeline_ptr->pipeline_);
104 if (ML_ERROR_NONE != ret) {
105 LoggerE("ml_pipeline_construct() failed: [%d] (%s)", ret, get_error_message(ret));
106 if (ML_ERROR_STREAMS_PIPE == ret) {
107 return PlatformResult{
108 ErrorCode::ABORT_ERR,
109 "Could not create pipeline: invalid pipeline description or an internal error"};
111 return util::ToPlatformResult(ret, "Could not create a pipeline");
113 LoggerD("ml_pipeline_construct() succeeded");
115 *out = std::move(pipeline_ptr);
116 return PlatformResult{};
118 // Pipeline::createPipeline() end
120 Pipeline::~Pipeline() {
121 ScopeLogger("Destroying pipeline: [%d]", id_);
124 LoggerD("pipeline_ has already been destroyed");
131 void Pipeline::PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data) {
132 ScopeLogger("state: [%s]", StateToString(state).c_str());
134 Pipeline* pipeline = static_cast<Pipeline*>(user_data);
136 picojson::value response{picojson::object{}};
137 response.get<picojson::object>()[kListenerId] =
138 picojson::value{pipeline->state_change_listener_name_};
139 response.get<picojson::object>()[kState] = picojson::value{StateToString(state)};
141 common::Instance::PostMessage(pipeline->instance_ptr_, response);
144 // Pipeline::state begin
145 PlatformResult Pipeline::GetState(std::string* out) {
146 ScopeLogger("id_: [%d]", id_);
148 ml_pipeline_state_e state = ML_PIPELINE_STATE_UNKNOWN;
149 auto ret = ml_pipeline_get_state(pipeline_, &state);
150 if (ML_ERROR_NONE != ret) {
151 LoggerE("ml_pipeline_get_state() failed: [%d] (%s)", ret, get_error_message(ret));
152 return util::ToPlatformResult(ret, "Could not get pipeline state");
154 LoggerD("ml_pipeline_get_state() succeeded");
156 *out = StateToString(state);
157 return PlatformResult{};
159 // Pipeline::state end
161 // Pipeline::start() begin
162 PlatformResult Pipeline::Start() {
163 ScopeLogger("id_: [%d]", id_);
165 auto ret = ml_pipeline_start(pipeline_);
166 if (ML_ERROR_NONE != ret) {
167 LoggerE("ml_pipeline_start() failed: [%d] (%s)", ret, get_error_message(ret));
168 return util::ToPlatformResult(ret, "Could not start pipeline");
170 return PlatformResult{};
172 // Pipeline::start() end
174 // Pipeline::stop() begin
175 PlatformResult Pipeline::Stop() {
176 ScopeLogger("id_: [%d]", id_);
178 auto ret = ml_pipeline_stop(pipeline_);
179 if (ML_ERROR_NONE != ret) {
180 LoggerE("ml_pipeline_stop() failed: [%d] (%s)", ret, get_error_message(ret));
181 return util::ToPlatformResult(ret, "Could not stop pipeline");
183 return PlatformResult{};
185 // Pipeline::stop() end
187 // Pipeline::dispose() begin
188 PlatformResult Pipeline::Dispose() {
189 ScopeLogger("id_: [%d]", id_);
192 * TODO in future commits:
194 * Release all nodes belonging to this pipeline and
195 * cached in this object in containers like
196 * switches_, node_infos_, etc.
198 * They have to be released HERE (i.e. BEFORE releasing pipeline_).
199 * If they're released after pipeline_, the app may crash.
211 auto ret = ml_pipeline_destroy(pipeline_);
212 if (ML_ERROR_NONE != ret) {
213 LoggerE("ml_pipeline_destroy() failed: [%d] (%s)", ret, get_error_message(ret));
214 return util::ToPlatformResult(ret, "Could not dispose the pipeline");
216 LoggerD("ml_pipeline_destroy() succeeded");
220 return PlatformResult{};
222 // Pipeline::dispose() end
224 // Pipeline::getNodeInfo() begin
225 PlatformResult Pipeline::GetNodeInfo(const std::string& name) {
226 ScopeLogger("id_: [%d], name: [%s]", id_, name.c_str());
228 auto nodeinfo_it = node_info_.find(name);
229 if (node_info_.end() != nodeinfo_it) {
230 LoggerD("NodeInfo [%s] found", name.c_str());
231 return PlatformResult{};
234 std::unique_ptr<NodeInfo> node_info_ptr;
235 auto ret = NodeInfo::CreateNodeInfo(pipeline_, name, &node_info_ptr);
241 node_info_.insert({name, std::move(node_info_ptr)});
243 return PlatformResult{};
245 // Pipeline::getNodeInfo() end
247 // Pipeline::getSource() begin
248 PlatformResult Pipeline::GetSource(const std::string& name) {
249 ScopeLogger("id: [%d], name: [%s]", id_, name.c_str());
251 auto source_it = sources_.find(name);
252 if (sources_.end() != source_it) {
253 LoggerD("Source [%s] found", name.c_str());
254 return PlatformResult{};
257 std::unique_ptr<Source> source_ptr;
258 auto ret = Source::CreateSource(name, pipeline_, source_ptr);
260 sources_.insert({name, std::move(source_ptr)});
264 // Pipeline::getSource() end
266 // Pipeline::getSwitch() begin
267 PlatformResult Pipeline::GetSwitch(const std::string& name, std::string* type) {
268 ScopeLogger("id: [%d], name: [%s]", id_, name.c_str());
270 auto switch_it = switches_.find(name);
271 if (switches_.end() != switch_it) {
272 LoggerD("Switch [%s] found", name.c_str());
273 *type = switch_it->second->GetType();
274 return PlatformResult{};
276 LoggerD("Switch [%s] not found", name.c_str());
278 std::unique_ptr<Switch> switch_ptr;
279 auto ret = Switch::CreateSwitch(name, pipeline_, switch_ptr);
281 *type = switch_ptr->GetType();
282 switches_.insert({name, std::move(switch_ptr)});
286 // Pipeline::getSwitch() end
288 // Pipeline::getValve() begin
289 PlatformResult Pipeline::GetValve(const std::string& name) {
290 ScopeLogger("id: [%d], name: [%s]", id_, name.c_str());
292 auto valve_it = valves_.find(name);
293 if (valves_.end() != valve_it) {
294 LoggerD("Valve [%s] found", name.c_str());
295 return PlatformResult{};
297 LoggerD("Creating [%s] Valve", name.c_str());
299 std::unique_ptr<Valve> valve_ptr;
300 auto ret = Valve::CreateValve(name, pipeline_, *this, valve_ptr);
302 valves_.insert({name, std::move(valve_ptr)});
306 // Pipeline::getValve() end
308 // Pipeline::registerSinkCallback() begin
309 PlatformResult Pipeline::RegisterSinkListener(const std::string& sink_name,
310 const std::string& listener_name) {
311 ScopeLogger("sink_name: [%s], listener_name: [%s], id_: [%d]", sink_name.c_str(),
312 listener_name.c_str(), id_);
314 if (sinks_.count(sink_name)) {
315 LoggerD("Listener for [%s] sink is already registered", sink_name.c_str());
316 return PlatformResult{};
319 std::unique_ptr<Sink> sink_ptr;
320 auto ret = Sink::CreateAndRegisterSink(sink_name, listener_name, pipeline_, instance_ptr_,
321 tensors_info_manager_ptr_, &sink_ptr);
326 sinks_.insert({sink_name, std::move(sink_ptr)});
328 return PlatformResult{};
330 // Pipeline::registerSinkCallback() end
332 // Pipeline::unregisterSinkCallback() begin
333 PlatformResult Pipeline::UnregisterSinkListener(const std::string& sink_name) {
334 ScopeLogger("sink_name: [%s], id_: [%d]", sink_name.c_str(), id_);
336 auto sink_it = sinks_.find(sink_name);
337 if (sinks_.end() == sink_it) {
338 LoggerD("sink [%s] not found", sink_name.c_str());
339 return PlatformResult{ErrorCode::INVALID_VALUES_ERR, "The sink has not been registered"};
342 auto ret = sink_it->second->Unregister();
344 sinks_.erase(sink_it);
348 // Pipeline::unregisterSinkCallback() end
350 // NodeInfo::getProperty() begin
351 PlatformResult Pipeline::getProperty(const std::string& node_name, const std::string& name,
352 const std::string& type, picojson::object* property) {
353 ScopeLogger("id_: [%d], name: [%s], type: [%s]", id_, name.c_str(), type.c_str());
355 auto nodeinfo_it = node_info_.find(node_name);
356 if (node_info_.end() == nodeinfo_it) {
357 LoggerD("NodeInfo [%s] not found", node_name.c_str());
358 return PlatformResult{ErrorCode::NOT_FOUND_ERR, "NodeInfo not found"};
361 return nodeinfo_it->second->getProperty(name, type, property);
363 // NodeInfo::getProperty() end
365 // NodeInfo::setProperty() begin
366 PlatformResult Pipeline::setProperty(const std::string& node_name, const std::string& name,
367 const std::string& type, const picojson::value& property) {
368 ScopeLogger("id_: [%d], name: [%s], type: [%s]", id_, name.c_str(), type.c_str());
370 auto nodeinfo_it = node_info_.find(node_name);
371 if (node_info_.end() == nodeinfo_it) {
372 LoggerD("NodeInfo [%s] not found", node_name.c_str());
373 return PlatformResult{ErrorCode::NOT_FOUND_ERR, "NodeInfo not found"};
376 return nodeinfo_it->second->setProperty(name, type, property);
378 // NodeInfo::setProperty() end
380 // Source::inputTensorsInfo begin
381 PlatformResult Pipeline::getInputTensorsInfo(const std::string& name, ml_tensors_info_h* result) {
384 auto source_it = sources_.find(name);
385 if (sources_.end() == source_it) {
386 LoggerD("Source [%s] not found", name.c_str());
387 return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Source not found"};
390 return source_it->second->getInputTensorsInfo(result);
392 // Source::inputTensorsInfo end
394 // Source::inputData() begin
395 PlatformResult Pipeline::SourceInputData(const std::string& name, TensorsData* tensors_data) {
398 auto source_it = sources_.find(name);
399 if (sources_.end() == source_it) {
400 LoggerD("Source [%s] not found", name.c_str());
401 return PlatformResult{ErrorCode::NOT_FOUND_ERR, "Source not found"};
404 return source_it->second->SourceInputData(tensors_data);
406 // Source::inputData() end
408 // Switch::getPadList() begin
409 PlatformResult Pipeline::GetSwitch(const std::string& name, Switch** out) {
410 ScopeLogger("id: [%d], name: [%s]", id_, name.c_str());
412 auto switch_it = switches_.find(name);
413 if (switches_.end() != switch_it) {
414 LoggerD("Switch [%s] found", name.c_str());
415 *out = switch_it->second.get();
416 return PlatformResult{};
418 LoggerE("Switch [%s] not found", name.c_str());
419 return PlatformResult{ErrorCode::ABORT_ERR, "Switch does not exist"};
422 // Switch::getPadList() end
424 // Valve::setOpen() begin
425 PlatformResult Pipeline::GetNodeInfo(const std::string& name, NodeInfo** out) {
426 ScopeLogger("id_: [%d], name: [%s]", id_, name.c_str());
428 auto ret = GetNodeInfo(name);
430 *out = node_info_[name].get();
436 PlatformResult Pipeline::GetValve(const std::string& name, Valve** out) {
437 ScopeLogger("id: [%d], name: [%s]", id_, name.c_str());
439 auto ret = GetValve(name);
441 *out = valves_[name].get();
445 // Valve::setOpen() end
447 } // namespace extension