1 /* SPDX-License-Identifier: LGPL-2.1-only */
3 * Copyright (C) 2022 Samsung Electronics Co., Ltd.
7 * @brief Publish incoming streams
8 * @author Yechan Choi <yechan9.choi@samsung.com>
9 * @see http://github.com/nnstreamer/nnstreamer
17 #include "edge_sink.h"
19 GST_DEBUG_CATEGORY_STATIC (gst_edgesink_debug);
20 #define GST_CAT_DEFAULT gst_edgesink_debug
23 * @brief the capabilities of the inputs.
25 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
31 * @brief edgesink properties
44 PROP_CONNECTION_TIMEOUT,
48 #define DEFAULT_MQTT_HOST "127.0.0.1"
49 #define DEFAULT_MQTT_PORT 1883
51 #define gst_edgesink_parent_class parent_class
52 G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
54 static void gst_edgesink_set_property (GObject * object,
55 guint prop_id, const GValue * value, GParamSpec * pspec);
57 static void gst_edgesink_get_property (GObject * object,
58 guint prop_id, GValue * value, GParamSpec * pspec);
60 static void gst_edgesink_finalize (GObject * object);
62 static gboolean gst_edgesink_start (GstBaseSink * basesink);
63 static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
65 static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
67 static gchar *gst_edgesink_get_host (GstEdgeSink * self);
68 static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
70 static guint16 gst_edgesink_get_port (GstEdgeSink * self);
71 static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
73 static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
75 static void gst_edgesink_set_connect_type (GstEdgeSink * self,
76 const nns_edge_connect_type_e connect_type);
79 * @brief initialize the class
82 gst_edgesink_class_init (GstEdgeSinkClass * klass)
84 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
85 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
86 GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
88 gobject_class->set_property = gst_edgesink_set_property;
89 gobject_class->get_property = gst_edgesink_get_property;
90 gobject_class->finalize = gst_edgesink_finalize;
92 g_object_class_install_property (gobject_class, PROP_HOST,
93 g_param_spec_string ("host", "Host",
94 "A self host address to accept connection from edgesrc", DEFAULT_HOST,
95 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96 g_object_class_install_property (gobject_class, PROP_PORT,
97 g_param_spec_uint ("port", "Port",
98 "A self port address to accept connection from edgesrc. "
99 "If the port is set to 0 then the available port is allocated. "
100 "If the connect-type is AITT then the port setting is not required.",
101 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
102 g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
103 g_param_spec_enum ("connect-type", "Connect Type",
104 "The connections type between edgesink and edgesrc.",
105 GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
106 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107 g_object_class_install_property (gobject_class, PROP_DEST_HOST,
108 g_param_spec_string ("dest-host", "Destination Host",
109 "The destination hostname of the broker", DEFAULT_MQTT_HOST,
110 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111 g_object_class_install_property (gobject_class, PROP_DEST_PORT,
112 g_param_spec_uint ("dest-port", "Destination Port",
113 "The destination port of the broker", 0,
114 65535, DEFAULT_MQTT_PORT,
115 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116 g_object_class_install_property (gobject_class, PROP_TOPIC,
117 g_param_spec_string ("topic", "Topic",
118 "The main topic of the host and option if necessary. "
119 "(topic)/(optional topic for main topic).", "",
120 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121 g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
122 g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
123 "Wait until edgesink is connected to edgesrc. "
124 "In case of false(default), the buffers entering the edgesink are dropped.",
125 FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
126 g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
127 g_param_spec_uint64 ("connection-timeout",
128 "Timeout for wating a connection",
129 "The timeout (in milliseconds) for waiting a connection to receiver. "
130 "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
131 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 gst_element_class_add_pad_template (gstelement_class,
134 gst_static_pad_template_get (&sinktemplate));
136 gst_element_class_set_static_metadata (gstelement_class,
137 "EdgeSink", "Sink/Edge",
138 "Publish incoming streams", "Samsung Electronics Co., Ltd.");
140 gstbasesink_class->start = gst_edgesink_start;
141 gstbasesink_class->render = gst_edgesink_render;
142 gstbasesink_class->set_caps = gst_edgesink_set_caps;
144 GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
145 GST_EDGE_ELEM_NAME_SINK, 0, "Edge sink");
149 * @brief initialize the new element
152 gst_edgesink_init (GstEdgeSink * self)
154 self->host = g_strdup (DEFAULT_HOST);
155 self->port = DEFAULT_PORT;
156 self->dest_host = g_strdup (DEFAULT_HOST);
157 self->dest_port = DEFAULT_PORT;
159 self->connect_type = DEFAULT_CONNECT_TYPE;
160 self->wait_connection = FALSE;
161 self->connection_timeout = 0;
165 * @brief set property
168 gst_edgesink_set_property (GObject * object, guint prop_id,
169 const GValue * value, GParamSpec * pspec)
171 GstEdgeSink *self = GST_EDGESINK (object);
175 gst_edgesink_set_host (self, g_value_get_string (value));
178 gst_edgesink_set_port (self, g_value_get_uint (value));
181 if (!g_value_get_string (value)) {
182 nns_logw ("dest host property cannot be NULL");
185 g_free (self->dest_host);
186 self->dest_host = g_value_dup_string (value);
189 self->dest_port = g_value_get_uint (value);
191 case PROP_CONNECT_TYPE:
192 gst_edgesink_set_connect_type (self, g_value_get_enum (value));
195 if (!g_value_get_string (value)) {
196 nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
199 g_free (self->topic);
200 self->topic = g_value_dup_string (value);
202 case PROP_WAIT_CONNECTION:
203 self->wait_connection = g_value_get_boolean (value);
205 case PROP_CONNECTION_TIMEOUT:
206 self->connection_timeout = g_value_get_uint64 (value);
209 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
215 * @brief get property
218 gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
221 GstEdgeSink *self = GST_EDGESINK (object);
225 g_value_set_string (value, gst_edgesink_get_host (self));
228 g_value_set_uint (value, gst_edgesink_get_port (self));
231 g_value_set_string (value, self->dest_host);
234 g_value_set_uint (value, self->dest_port);
236 case PROP_CONNECT_TYPE:
237 g_value_set_enum (value, gst_edgesink_get_connect_type (self));
240 g_value_set_string (value, self->topic);
242 case PROP_WAIT_CONNECTION:
243 g_value_set_boolean (value, self->wait_connection);
245 case PROP_CONNECTION_TIMEOUT:
246 g_value_set_uint64 (value, self->connection_timeout);
249 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
255 * @brief finalize the object
258 gst_edgesink_finalize (GObject * object)
260 GstEdgeSink *self = GST_EDGESINK (object);
265 g_free (self->dest_host);
266 self->dest_host = NULL;
268 g_free (self->topic);
272 nns_edge_release_handle (self->edge_h);
276 G_OBJECT_CLASS (parent_class)->finalize (object);
280 * @brief start processing of edgesink
283 gst_edgesink_start (GstBaseSink * basesink)
285 GstEdgeSink *self = GST_EDGESINK (basesink);
291 nns_edge_create_handle (NULL, self->connect_type,
292 NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
294 if (NNS_EDGE_ERROR_NONE != ret) {
295 nns_loge ("Failed to get nnstreamer edge handle.");
298 nns_edge_release_handle (self->edge_h);
306 nns_edge_set_info (self->edge_h, "HOST", self->host);
307 if (self->port > 0) {
308 port = g_strdup_printf ("%u", self->port);
309 nns_edge_set_info (self->edge_h, "PORT", port);
313 nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
314 if (self->dest_port > 0) {
315 port = g_strdup_printf ("%u", self->dest_port);
316 nns_edge_set_info (self->edge_h, "DEST_PORT", port);
320 nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
322 if (0 != nns_edge_start (self->edge_h)) {
324 ("Failed to start NNStreamer-edge. Please check server IP and port");
328 if (self->wait_connection) {
329 guint64 remaining = self->connection_timeout;
331 remaining = G_MAXUINT64;
333 while (remaining >= 10 &&
334 NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
335 if (!self->wait_connection) {
337 ("Waiting for connection to edgesrc was canceled by the user.");
345 NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
346 g_usleep (remaining * 1000U);
349 if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
350 nns_loge ("Failed to connect to edgesrc within timeout: %ju ms",
351 self->connection_timeout);
360 * @brief render buffer, send buffer
363 gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
365 GstEdgeSink *self = GST_EDGESINK (basesink);
366 nns_edge_data_h data_h;
369 GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
370 GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
372 ret = nns_edge_data_create (&data_h);
373 if (ret != NNS_EDGE_ERROR_NONE) {
374 nns_loge ("Failed to create data handle in edgesink");
375 return GST_FLOW_ERROR;
378 num_mems = gst_buffer_n_memory (buffer);
379 for (i = 0; i < num_mems; i++) {
380 mem[i] = gst_buffer_peek_memory (buffer, i);
381 if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
382 nns_loge ("Cannot map the %uth memory in gst-buffer", i);
386 nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
389 nns_edge_send (self->edge_h, data_h);
394 nns_edge_data_destroy (data_h);
396 for (i = 0; i < num_mems; i++) {
397 gst_memory_unmap (mem[i], &map[i]);
404 * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
407 gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
409 GstEdgeSink *sink = GST_EDGESINK (basesink);
410 gchar *caps_str, *prev_caps_str, *new_caps_str;
413 caps_str = gst_caps_to_string (caps);
415 nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
416 if (!prev_caps_str) {
417 prev_caps_str = g_strdup ("");
420 g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
421 set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
423 g_free (prev_caps_str);
424 g_free (new_caps_str);
427 return set_rst == NNS_EDGE_ERROR_NONE;
431 * @brief getter for the 'host' property.
434 gst_edgesink_get_host (GstEdgeSink * self)
440 * @brief setter for the 'host' property.
443 gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
447 self->host = g_strdup (host);
451 * @brief getter for the 'port' property.
454 gst_edgesink_get_port (GstEdgeSink * self)
460 * @brief setter for the 'port' property.
463 gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
469 * @brief getter for the 'connect_type' property.
471 static nns_edge_connect_type_e
472 gst_edgesink_get_connect_type (GstEdgeSink * self)
474 return self->connect_type;
478 * @brief setter for the 'connect_type' property.
481 gst_edgesink_set_connect_type (GstEdgeSink * self,
482 const nns_edge_connect_type_e connect_type)
484 self->connect_type = connect_type;