[edge] Deprecate port and host property of edgesrc and update description for port...
[platform/upstream/nnstreamer.git] / gst / edge / edge_sink.c
1 /* SPDX-License-Identifier: LGPL-2.1-only */
2 /**
3  * Copyright (C) 2022 Samsung Electronics Co., Ltd.
4  *
5  * @file    edge_sink.c
6  * @date    01 Aug 2022
7  * @brief   Publish incoming streams
8  * @author  Yechan Choi <yechan9.choi@samsung.com>
9  * @see     http://github.com/nnstreamer/nnstreamer
10  * @bug     No known bugs
11  *
12  */
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16
17 #include "edge_sink.h"
18
19 GST_DEBUG_CATEGORY_STATIC (gst_edgesink_debug);
20 #define GST_CAT_DEFAULT gst_edgesink_debug
21
22 /**
23  * @brief the capabilities of the inputs.
24  */
25 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
26     GST_PAD_SINK,
27     GST_PAD_ALWAYS,
28     GST_STATIC_CAPS_ANY);
29
30 /**
31  * @brief edgesink properties
32  */
33 enum
34 {
35   PROP_0,
36
37   PROP_HOST,
38   PROP_PORT,
39   PROP_DEST_HOST,
40   PROP_DEST_PORT,
41   PROP_CONNECT_TYPE,
42   PROP_TOPIC,
43   PROP_WAIT_CONNECTION,
44   PROP_CONNECTION_TIMEOUT,
45
46   PROP_LAST
47 };
48 #define DEFAULT_MQTT_HOST "127.0.0.1"
49 #define DEFAULT_MQTT_PORT 1883
50
51 #define gst_edgesink_parent_class parent_class
52 G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
53
54 static void gst_edgesink_set_property (GObject * object,
55     guint prop_id, const GValue * value, GParamSpec * pspec);
56
57 static void gst_edgesink_get_property (GObject * object,
58     guint prop_id, GValue * value, GParamSpec * pspec);
59
60 static void gst_edgesink_finalize (GObject * object);
61
62 static gboolean gst_edgesink_start (GstBaseSink * basesink);
63 static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
64     GstBuffer * buffer);
65 static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
66
67 static gchar *gst_edgesink_get_host (GstEdgeSink * self);
68 static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
69
70 static guint16 gst_edgesink_get_port (GstEdgeSink * self);
71 static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
72
73 static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
74     self);
75 static void gst_edgesink_set_connect_type (GstEdgeSink * self,
76     const nns_edge_connect_type_e connect_type);
77
78 /**
79  * @brief initialize the class
80  */
81 static void
82 gst_edgesink_class_init (GstEdgeSinkClass * klass)
83 {
84   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
85   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
86   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
87
88   gobject_class->set_property = gst_edgesink_set_property;
89   gobject_class->get_property = gst_edgesink_get_property;
90   gobject_class->finalize = gst_edgesink_finalize;
91
92   g_object_class_install_property (gobject_class, PROP_HOST,
93       g_param_spec_string ("host", "Host",
94           "A self host address to accept connection from edgesrc", DEFAULT_HOST,
95           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96   g_object_class_install_property (gobject_class, PROP_PORT,
97       g_param_spec_uint ("port", "Port",
98           "A self port address to accept connection from edgesrc. "
99           "If the port is set to 0 then the available port is allocated. "
100           "If the connect-type is AITT then the port setting is not required.",
101           0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
102   g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
103       g_param_spec_enum ("connect-type", "Connect Type",
104           "The connections type between edgesink and edgesrc.",
105           GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
106           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107   g_object_class_install_property (gobject_class, PROP_DEST_HOST,
108       g_param_spec_string ("dest-host", "Destination Host",
109           "The destination hostname of the broker", DEFAULT_MQTT_HOST,
110           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111   g_object_class_install_property (gobject_class, PROP_DEST_PORT,
112       g_param_spec_uint ("dest-port", "Destination Port",
113           "The destination port of the broker", 0,
114           65535, DEFAULT_MQTT_PORT,
115           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116   g_object_class_install_property (gobject_class, PROP_TOPIC,
117       g_param_spec_string ("topic", "Topic",
118           "The main topic of the host and option if necessary. "
119           "(topic)/(optional topic for main topic).", "",
120           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121   g_object_class_install_property (gobject_class, PROP_WAIT_CONNECTION,
122       g_param_spec_boolean ("wait-connection", "Wait connection to edgesrc",
123           "Wait until edgesink is connected to edgesrc. "
124           "In case of false(default), the buffers entering the edgesink are dropped.",
125           FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
126   g_object_class_install_property (gobject_class, PROP_CONNECTION_TIMEOUT,
127       g_param_spec_uint64 ("connection-timeout",
128           "Timeout for wating a connection",
129           "The timeout (in milliseconds) for waiting a connection to receiver. "
130           "0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132
133   gst_element_class_add_pad_template (gstelement_class,
134       gst_static_pad_template_get (&sinktemplate));
135
136   gst_element_class_set_static_metadata (gstelement_class,
137       "EdgeSink", "Sink/Edge",
138       "Publish incoming streams", "Samsung Electronics Co., Ltd.");
139
140   gstbasesink_class->start = gst_edgesink_start;
141   gstbasesink_class->render = gst_edgesink_render;
142   gstbasesink_class->set_caps = gst_edgesink_set_caps;
143
144   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
145       GST_EDGE_ELEM_NAME_SINK, 0, "Edge sink");
146 }
147
148 /**
149  * @brief initialize the new element
150  */
151 static void
152 gst_edgesink_init (GstEdgeSink * self)
153 {
154   self->host = g_strdup (DEFAULT_HOST);
155   self->port = DEFAULT_PORT;
156   self->dest_host = g_strdup (DEFAULT_HOST);
157   self->dest_port = DEFAULT_PORT;
158   self->topic = NULL;
159   self->connect_type = DEFAULT_CONNECT_TYPE;
160   self->wait_connection = FALSE;
161   self->connection_timeout = 0;
162 }
163
164 /**
165  * @brief set property
166  */
167 static void
168 gst_edgesink_set_property (GObject * object, guint prop_id,
169     const GValue * value, GParamSpec * pspec)
170 {
171   GstEdgeSink *self = GST_EDGESINK (object);
172
173   switch (prop_id) {
174     case PROP_HOST:
175       gst_edgesink_set_host (self, g_value_get_string (value));
176       break;
177     case PROP_PORT:
178       gst_edgesink_set_port (self, g_value_get_uint (value));
179       break;
180     case PROP_DEST_HOST:
181       if (!g_value_get_string (value)) {
182         nns_logw ("dest host property cannot be NULL");
183         break;
184       }
185       g_free (self->dest_host);
186       self->dest_host = g_value_dup_string (value);
187       break;
188     case PROP_DEST_PORT:
189       self->dest_port = g_value_get_uint (value);
190       break;
191     case PROP_CONNECT_TYPE:
192       gst_edgesink_set_connect_type (self, g_value_get_enum (value));
193       break;
194     case PROP_TOPIC:
195       if (!g_value_get_string (value)) {
196         nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
197         break;
198       }
199       g_free (self->topic);
200       self->topic = g_value_dup_string (value);
201       break;
202     case PROP_WAIT_CONNECTION:
203       self->wait_connection = g_value_get_boolean (value);
204       break;
205     case PROP_CONNECTION_TIMEOUT:
206       self->connection_timeout = g_value_get_uint64 (value);
207       break;
208     default:
209       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
210       break;
211   }
212 }
213
214 /**
215  * @brief get property
216  */
217 static void
218 gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
219     GParamSpec * pspec)
220 {
221   GstEdgeSink *self = GST_EDGESINK (object);
222
223   switch (prop_id) {
224     case PROP_HOST:
225       g_value_set_string (value, gst_edgesink_get_host (self));
226       break;
227     case PROP_PORT:
228       g_value_set_uint (value, gst_edgesink_get_port (self));
229       break;
230     case PROP_DEST_HOST:
231       g_value_set_string (value, self->dest_host);
232       break;
233     case PROP_DEST_PORT:
234       g_value_set_uint (value, self->dest_port);
235       break;
236     case PROP_CONNECT_TYPE:
237       g_value_set_enum (value, gst_edgesink_get_connect_type (self));
238       break;
239     case PROP_TOPIC:
240       g_value_set_string (value, self->topic);
241       break;
242     case PROP_WAIT_CONNECTION:
243       g_value_set_boolean (value, self->wait_connection);
244       break;
245     case PROP_CONNECTION_TIMEOUT:
246       g_value_set_uint64 (value, self->connection_timeout);
247       break;
248     default:
249       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
250       break;
251   }
252 }
253
254 /**
255  * @brief finalize the object
256  */
257 static void
258 gst_edgesink_finalize (GObject * object)
259 {
260   GstEdgeSink *self = GST_EDGESINK (object);
261
262   g_free (self->host);
263   self->host = NULL;
264
265   g_free (self->dest_host);
266   self->dest_host = NULL;
267
268   g_free (self->topic);
269   self->topic = NULL;
270
271   if (self->edge_h) {
272     nns_edge_release_handle (self->edge_h);
273     self->edge_h = NULL;
274   }
275
276   G_OBJECT_CLASS (parent_class)->finalize (object);
277 }
278
279 /**
280  * @brief start processing of edgesink
281  */
282 static gboolean
283 gst_edgesink_start (GstBaseSink * basesink)
284 {
285   GstEdgeSink *self = GST_EDGESINK (basesink);
286
287   int ret;
288   char *port = NULL;
289
290   ret =
291       nns_edge_create_handle (NULL, self->connect_type,
292       NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
293
294   if (NNS_EDGE_ERROR_NONE != ret) {
295     nns_loge ("Failed to get nnstreamer edge handle.");
296
297     if (self->edge_h) {
298       nns_edge_release_handle (self->edge_h);
299       self->edge_h = NULL;
300     }
301
302     return FALSE;
303   }
304
305   if (self->host)
306     nns_edge_set_info (self->edge_h, "HOST", self->host);
307   if (self->port > 0) {
308     port = g_strdup_printf ("%u", self->port);
309     nns_edge_set_info (self->edge_h, "PORT", port);
310     g_free (port);
311   }
312   if (self->dest_host)
313     nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
314   if (self->dest_port > 0) {
315     port = g_strdup_printf ("%u", self->dest_port);
316     nns_edge_set_info (self->edge_h, "DEST_PORT", port);
317     g_free (port);
318   }
319   if (self->topic)
320     nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
321
322   if (0 != nns_edge_start (self->edge_h)) {
323     nns_loge
324         ("Failed to start NNStreamer-edge. Please check server IP and port");
325     return FALSE;
326   }
327
328   if (self->wait_connection) {
329     guint64 remaining = self->connection_timeout;
330     if (0 == remaining)
331       remaining = G_MAXUINT64;
332
333     while (remaining >= 10 &&
334         NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
335       if (!self->wait_connection) {
336         nns_logi
337             ("Waiting for connection to edgesrc was canceled by the user.");
338         return FALSE;
339       }
340       g_usleep (10000);
341       remaining -= 10;
342     }
343
344     if (remaining > 0 &&
345         NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
346       g_usleep (remaining * 1000U);
347     }
348
349     if (NNS_EDGE_ERROR_NONE != nns_edge_is_connected (self->edge_h)) {
350       nns_loge ("Failed to connect to edgesrc within timeout: %ju ms",
351           self->connection_timeout);
352       return FALSE;
353     }
354   }
355
356   return TRUE;
357 }
358
359 /**
360  * @brief render buffer, send buffer
361  */
362 static GstFlowReturn
363 gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
364 {
365   GstEdgeSink *self = GST_EDGESINK (basesink);
366   nns_edge_data_h data_h;
367   guint i, num_mems;
368   int ret;
369   GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
370   GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
371
372   ret = nns_edge_data_create (&data_h);
373   if (ret != NNS_EDGE_ERROR_NONE) {
374     nns_loge ("Failed to create data handle in edgesink");
375     return GST_FLOW_ERROR;
376   }
377
378   num_mems = gst_buffer_n_memory (buffer);
379   for (i = 0; i < num_mems; i++) {
380     mem[i] = gst_buffer_peek_memory (buffer, i);
381     if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
382       nns_loge ("Cannot map the %uth memory in gst-buffer", i);
383       num_mems = i;
384       goto done;
385     }
386     nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
387   }
388
389   nns_edge_send (self->edge_h, data_h);
390   goto done;
391
392 done:
393   if (data_h)
394     nns_edge_data_destroy (data_h);
395
396   for (i = 0; i < num_mems; i++) {
397     gst_memory_unmap (mem[i], &map[i]);
398   }
399
400   return GST_FLOW_OK;
401 }
402
403 /**
404  * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
405  */
406 static gboolean
407 gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
408 {
409   GstEdgeSink *sink = GST_EDGESINK (basesink);
410   gchar *caps_str, *prev_caps_str, *new_caps_str;
411   int set_rst;
412
413   caps_str = gst_caps_to_string (caps);
414
415   nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
416   if (!prev_caps_str) {
417     prev_caps_str = g_strdup ("");
418   }
419   new_caps_str =
420       g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
421   set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
422
423   g_free (prev_caps_str);
424   g_free (new_caps_str);
425   g_free (caps_str);
426
427   return set_rst == NNS_EDGE_ERROR_NONE;
428 }
429
430 /**
431  * @brief getter for the 'host' property.
432  */
433 static gchar *
434 gst_edgesink_get_host (GstEdgeSink * self)
435 {
436   return self->host;
437 }
438
439 /**
440  * @brief setter for the 'host' property.
441  */
442 static void
443 gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
444 {
445   if (self->host)
446     g_free (self->host);
447   self->host = g_strdup (host);
448 }
449
450 /**
451  * @brief getter for the 'port' property.
452  */
453 static guint16
454 gst_edgesink_get_port (GstEdgeSink * self)
455 {
456   return self->port;
457 }
458
459 /**
460  * @brief setter for the 'port' property.
461  */
462 static void
463 gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
464 {
465   self->port = port;
466 }
467
468 /**
469  * @brief getter for the 'connect_type' property.
470  */
471 static nns_edge_connect_type_e
472 gst_edgesink_get_connect_type (GstEdgeSink * self)
473 {
474   return self->connect_type;
475 }
476
477 /**
478  * @brief setter for the 'connect_type' property.
479  */
480 static void
481 gst_edgesink_set_connect_type (GstEdgeSink * self,
482     const nns_edge_connect_type_e connect_type)
483 {
484   self->connect_type = connect_type;
485 }