[edge] Deprecate port and host property of edgesrc and update description for port...
[platform/upstream/nnstreamer.git] / gst / edge / edge_src.c
1 /* SPDX-License-Identifier: LGPL-2.1-only */
2 /**
3  * Copyright (C) 2022 Samsung Electronics Co., Ltd.
4  *
5  * @file    edge_src.c
6  * @date    02 Aug 2022
7  * @brief   Subscribe and push incoming data to the GStreamer pipeline
8  * @author  Yechan Choi <yechan9.choi@samsung.com>
9  * @see     http://github.com/nnstreamer/nnstreamer
10  * @bug     No known bugs
11  *
12  */
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16
17 #include "edge_src.h"
18
19 GST_DEBUG_CATEGORY_STATIC (gst_edgesrc_debug);
20 #define GST_CAT_DEFAULT gst_edgesrc_debug
21
22 /**
23  * @brief the capabilities of the outputs
24  */
25 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
26     GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
27
28 /**
29  * @brief edgesrc properties
30  */
31 enum
32 {
33   PROP_0,
34   PROP_HOST,
35   PROP_PORT,
36   PROP_DEST_HOST,
37   PROP_DEST_PORT,
38   PROP_CONNECT_TYPE,
39   PROP_TOPIC,
40
41   PROP_LAST
42 };
43
44 #define gst_edgesrc_parent_class parent_class
45 G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_BASE_SRC);
46
47 static void gst_edgesrc_set_property (GObject * object, guint prop_id,
48     const GValue * value, GParamSpec * pspec);
49 static void gst_edgesrc_get_property (GObject * object, guint prop_id,
50     GValue * value, GParamSpec * pspec);
51 static void gst_edgesrc_class_finalize (GObject * object);
52
53 static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
54 static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
55     guint size, GstBuffer ** out_buf);
56
57 static gchar *gst_edgesrc_get_dest_host (GstEdgeSrc * self);
58 static void gst_edgesrc_set_dest_host (GstEdgeSrc * self,
59     const gchar * dest_host);
60
61 static guint16 gst_edgesrc_get_dest_port (GstEdgeSrc * self);
62 static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
63     const guint16 dest_port);
64
65 static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
66 static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
67     const nns_edge_connect_type_e connect_type);
68
69 /**
70  * @brief initialize the class
71  */
72 static void
73 gst_edgesrc_class_init (GstEdgeSrcClass * klass)
74 {
75   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
76   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
77   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
78
79   gobject_class->set_property = gst_edgesrc_set_property;
80   gobject_class->get_property = gst_edgesrc_get_property;
81   gobject_class->finalize = gst_edgesrc_class_finalize;
82
83   g_object_class_install_property (gobject_class, PROP_HOST,
84       g_param_spec_string ("host", "Host",
85           "A self host address (DEPRECATED, has no effect).", DEFAULT_HOST,
86           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
87   g_object_class_install_property (gobject_class, PROP_PORT,
88       g_param_spec_uint ("port", "Port",
89           "A self port number (DEPRECATED, has no effect).",
90           0, 65535, DEFAULT_PORT,
91           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
92   g_object_class_install_property (gobject_class, PROP_DEST_HOST,
93       g_param_spec_string ("dest-host", "Destination Host",
94           "A host address of edgesink to receive the packets from edgesink",
95           DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96   g_object_class_install_property (gobject_class, PROP_DEST_PORT,
97       g_param_spec_uint ("dest-port", "Destination Port",
98           "A port of edgesink to receive the packets from edgesink", 0, 65535,
99           DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100   g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
101       g_param_spec_enum ("connect-type", "Connect Type",
102           "The connections type between edgesink and edgesrc.",
103           GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
104           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105   g_object_class_install_property (gobject_class, PROP_TOPIC,
106       g_param_spec_string ("topic", "Topic",
107           "The main topic of the host and option if necessary. "
108           "(topic)/(optional topic for main topic).", "",
109           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
110
111   gst_element_class_add_pad_template (gstelement_class,
112       gst_static_pad_template_get (&srctemplate));
113
114   gst_element_class_set_static_metadata (gstelement_class,
115       "EdgeSrc", "Source/Edge",
116       "Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");
117
118   gstbasesrc_class->start = gst_edgesrc_start;
119   gstbasesrc_class->create = gst_edgesrc_create;
120
121   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
122       GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
123 }
124
125 /**
126  * @brief initialize edgesrc element
127  */
128 static void
129 gst_edgesrc_init (GstEdgeSrc * self)
130 {
131   GstBaseSrc *basesrc = GST_BASE_SRC (self);
132
133   gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
134   gst_base_src_set_async (basesrc, FALSE);
135
136   self->dest_host = g_strdup (DEFAULT_HOST);
137   self->dest_port = DEFAULT_PORT;
138   self->topic = NULL;
139   self->msg_queue = g_async_queue_new ();
140   self->connect_type = DEFAULT_CONNECT_TYPE;
141 }
142
143 /**
144  * @brief set property
145  */
146 static void
147 gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
148     GParamSpec * pspec)
149 {
150   GstEdgeSrc *self = GST_EDGESRC (object);
151
152   switch (prop_id) {
153     case PROP_HOST:
154       nns_logw ("host property is deprecated");
155       break;
156     case PROP_PORT:
157       nns_logw ("port property is deprecated");
158       break;
159     case PROP_DEST_HOST:
160       gst_edgesrc_set_dest_host (self, g_value_get_string (value));
161       break;
162     case PROP_DEST_PORT:
163       gst_edgesrc_set_dest_port (self, g_value_get_uint (value));
164       break;
165     case PROP_CONNECT_TYPE:
166       gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
167       break;
168     case PROP_TOPIC:
169       if (!g_value_get_string (value)) {
170         nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
171         break;
172       }
173       g_free (self->topic);
174       self->topic = g_value_dup_string (value);
175       break;
176     default:
177       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
178       break;
179   }
180 }
181
182 /**
183  * @brief get property
184  */
185 static void
186 gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
187     GParamSpec * pspec)
188 {
189   GstEdgeSrc *self = GST_EDGESRC (object);
190
191   switch (prop_id) {
192     case PROP_HOST:
193       nns_logw ("host property is deprecated");
194       break;
195     case PROP_PORT:
196       nns_logw ("port property is deprecated");
197       break;
198     case PROP_DEST_HOST:
199       g_value_set_string (value, gst_edgesrc_get_dest_host (self));
200       break;
201     case PROP_DEST_PORT:
202       g_value_set_uint (value, gst_edgesrc_get_dest_port (self));
203       break;
204     case PROP_CONNECT_TYPE:
205       g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
206       break;
207     case PROP_TOPIC:
208       g_value_set_string (value, self->topic);
209       break;
210     default:
211       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
212       break;
213   }
214 }
215
216 /**
217  * @brief finalize the object
218  */
219 static void
220 gst_edgesrc_class_finalize (GObject * object)
221 {
222   GstEdgeSrc *self = GST_EDGESRC (object);
223   nns_edge_data_h data_h;
224
225   g_free (self->dest_host);
226   self->dest_host = NULL;
227
228   g_free (self->topic);
229   self->topic = NULL;
230
231   if (self->msg_queue) {
232     while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
233       nns_edge_data_destroy (data_h);
234     }
235     g_async_queue_unref (self->msg_queue);
236     self->msg_queue = NULL;
237   }
238
239   if (self->edge_h) {
240     nns_edge_release_handle (self->edge_h);
241     self->edge_h = NULL;
242   }
243   G_OBJECT_CLASS (parent_class)->finalize (object);
244 }
245
246 /**
247  * @brief nnstreamer-edge event callback.
248  */
249 static int
250 _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
251 {
252   nns_edge_event_e event_type;
253   int ret = NNS_EDGE_ERROR_NONE;
254
255   GstEdgeSrc *self = GST_EDGESRC (user_data);
256   if (0 != nns_edge_event_get_type (event_h, &event_type)) {
257     nns_loge ("Failed to get event type!");
258     return NNS_EDGE_ERROR_UNKNOWN;
259   }
260
261   switch (event_type) {
262     case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
263     {
264       nns_edge_data_h data;
265
266       nns_edge_event_parse_new_data (event_h, &data);
267       g_async_queue_push (self->msg_queue, data);
268       break;
269     }
270     case NNS_EDGE_EVENT_CONNECTION_CLOSED:
271     {
272       nns_edge_disconnect (self->edge_h);
273       ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
274       if (NNS_EDGE_ERROR_NONE != ret) {
275         nns_edge_data_h data_h;
276         nns_edge_data_create (&data_h);
277         g_async_queue_push (self->msg_queue, data_h);
278       }
279       break;
280     }
281     default:
282       break;
283   }
284
285   return ret;
286 }
287
288 /**
289  * @brief start edgesrc, called when state changed null to ready
290  */
291 static gboolean
292 gst_edgesrc_start (GstBaseSrc * basesrc)
293 {
294   GstEdgeSrc *self = GST_EDGESRC (basesrc);
295
296   int ret;
297   char *port = NULL;
298
299   ret =
300       nns_edge_create_handle (NULL, self->connect_type,
301       NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
302
303   if (NNS_EDGE_ERROR_NONE != ret) {
304     nns_loge ("Failed to get nnstreamer edge handle.");
305
306     if (self->edge_h) {
307       nns_edge_release_handle (self->edge_h);
308       self->edge_h = NULL;
309     }
310
311     return FALSE;
312   }
313
314   if (self->dest_host)
315     nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
316   if (self->dest_port > 0) {
317     port = g_strdup_printf ("%u", self->dest_port);
318     nns_edge_set_info (self->edge_h, "DEST_PORT", port);
319     g_free (port);
320   }
321   if (self->topic)
322     nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
323
324   nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
325
326   if (0 != nns_edge_start (self->edge_h)) {
327     nns_loge
328         ("Failed to start NNStreamer-edge. Please check server IP and port");
329     return FALSE;
330   }
331
332   if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
333     nns_loge ("Failed to connect to edge server!");
334     return FALSE;
335   }
336
337   return TRUE;
338 }
339
340 /**
341  * @brief Create a buffer containing the subscribed data
342  */
343 static GstFlowReturn
344 gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
345     GstBuffer ** out_buf)
346 {
347   GstEdgeSrc *self = GST_EDGESRC (basesrc);
348
349   nns_edge_data_h data_h;
350   GstBuffer *buffer = NULL;
351   guint i, num_data;
352   int ret;
353
354   UNUSED (offset);
355   UNUSED (size);
356
357   data_h = g_async_queue_pop (self->msg_queue);
358
359   if (!data_h) {
360     nns_loge ("Failed to get message from the edgesrc message queue.");
361     goto done;
362   }
363
364   ret = nns_edge_data_get_count (data_h, &num_data);
365   if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
366     nns_loge ("Failed to get the number of memories of the edge data.");
367     goto done;
368   }
369
370   buffer = gst_buffer_new ();
371   for (i = 0; i < num_data; i++) {
372     void *data = NULL;
373     nns_size_t data_len = 0;
374     gpointer new_data;
375
376     nns_edge_data_get (data_h, i, &data, &data_len);
377     new_data = _g_memdup (data, data_len);
378
379     gst_buffer_append_memory (buffer,
380         gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
381             g_free));
382   }
383
384 done:
385   if (data_h)
386     nns_edge_data_destroy (data_h);
387
388   if (buffer == NULL) {
389     nns_loge ("Failed to get buffer to push to the edgesrc.");
390     return GST_FLOW_ERROR;
391   }
392
393   *out_buf = buffer;
394
395   return GST_FLOW_OK;
396 }
397
398 /**
399  * @brief getter for the 'host' property.
400  */
401 static gchar *
402 gst_edgesrc_get_dest_host (GstEdgeSrc * self)
403 {
404   return self->dest_host;
405 }
406
407 /**
408  * @brief setter for the 'host' property.
409  */
410 static void
411 gst_edgesrc_set_dest_host (GstEdgeSrc * self, const gchar * dest_host)
412 {
413   g_free (self->dest_host);
414   self->dest_host = g_strdup (dest_host);
415 }
416
417 /**
418  * @brief getter for the 'port' property.
419  */
420 static guint16
421 gst_edgesrc_get_dest_port (GstEdgeSrc * self)
422 {
423   return self->dest_port;
424 }
425
426 /**
427  * @brief setter for the 'port' property.
428  */
429 static void
430 gst_edgesrc_set_dest_port (GstEdgeSrc * self, const guint16 dest_port)
431 {
432   self->dest_port = dest_port;
433 }
434
435 /**
436  * @brief getter for the 'connect_type' property.
437  */
438 static nns_edge_connect_type_e
439 gst_edgesrc_get_connect_type (GstEdgeSrc * self)
440 {
441   return self->connect_type;
442 }
443
444 /**
445  * @brief setter for the 'connect_type' property.
446  */
447 static void
448 gst_edgesrc_set_connect_type (GstEdgeSrc * self,
449     const nns_edge_connect_type_e connect_type)
450 {
451   self->connect_type = connect_type;
452 }