1 /* SPDX-License-Identifier: LGPL-2.1-only */
3 * Copyright (C) 2022 Samsung Electronics Co., Ltd.
7 * @brief Subscribe and push incoming data to the GStreamer pipeline
8 * @author Yechan Choi <yechan9.choi@samsung.com>
9 * @see http://github.com/nnstreamer/nnstreamer
19 GST_DEBUG_CATEGORY_STATIC (gst_edgesrc_debug);
20 #define GST_CAT_DEFAULT gst_edgesrc_debug
23 * @brief the capabilities of the outputs
25 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
26 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
29 * @brief edgesrc properties
44 #define gst_edgesrc_parent_class parent_class
45 G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_BASE_SRC);
47 static void gst_edgesrc_set_property (GObject * object, guint prop_id,
48 const GValue * value, GParamSpec * pspec);
49 static void gst_edgesrc_get_property (GObject * object, guint prop_id,
50 GValue * value, GParamSpec * pspec);
51 static void gst_edgesrc_class_finalize (GObject * object);
53 static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
54 static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
55 guint size, GstBuffer ** out_buf);
57 static gchar *gst_edgesrc_get_dest_host (GstEdgeSrc * self);
58 static void gst_edgesrc_set_dest_host (GstEdgeSrc * self,
59 const gchar * dest_host);
61 static guint16 gst_edgesrc_get_dest_port (GstEdgeSrc * self);
62 static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
63 const guint16 dest_port);
65 static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
66 static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
67 const nns_edge_connect_type_e connect_type);
70 * @brief initialize the class
73 gst_edgesrc_class_init (GstEdgeSrcClass * klass)
75 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
76 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
77 GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
79 gobject_class->set_property = gst_edgesrc_set_property;
80 gobject_class->get_property = gst_edgesrc_get_property;
81 gobject_class->finalize = gst_edgesrc_class_finalize;
83 g_object_class_install_property (gobject_class, PROP_HOST,
84 g_param_spec_string ("host", "Host",
85 "A self host address (DEPRECATED, has no effect).", DEFAULT_HOST,
86 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
87 g_object_class_install_property (gobject_class, PROP_PORT,
88 g_param_spec_uint ("port", "Port",
89 "A self port number (DEPRECATED, has no effect).",
90 0, 65535, DEFAULT_PORT,
91 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
92 g_object_class_install_property (gobject_class, PROP_DEST_HOST,
93 g_param_spec_string ("dest-host", "Destination Host",
94 "A host address of edgesink to receive the packets from edgesink",
95 DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
96 g_object_class_install_property (gobject_class, PROP_DEST_PORT,
97 g_param_spec_uint ("dest-port", "Destination Port",
98 "A port of edgesink to receive the packets from edgesink", 0, 65535,
99 DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
100 g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
101 g_param_spec_enum ("connect-type", "Connect Type",
102 "The connections type between edgesink and edgesrc.",
103 GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
104 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
105 g_object_class_install_property (gobject_class, PROP_TOPIC,
106 g_param_spec_string ("topic", "Topic",
107 "The main topic of the host and option if necessary. "
108 "(topic)/(optional topic for main topic).", "",
109 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
111 gst_element_class_add_pad_template (gstelement_class,
112 gst_static_pad_template_get (&srctemplate));
114 gst_element_class_set_static_metadata (gstelement_class,
115 "EdgeSrc", "Source/Edge",
116 "Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");
118 gstbasesrc_class->start = gst_edgesrc_start;
119 gstbasesrc_class->create = gst_edgesrc_create;
121 GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
122 GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
126 * @brief initialize edgesrc element
129 gst_edgesrc_init (GstEdgeSrc * self)
131 GstBaseSrc *basesrc = GST_BASE_SRC (self);
133 gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
134 gst_base_src_set_async (basesrc, FALSE);
136 self->dest_host = g_strdup (DEFAULT_HOST);
137 self->dest_port = DEFAULT_PORT;
139 self->msg_queue = g_async_queue_new ();
140 self->connect_type = DEFAULT_CONNECT_TYPE;
144 * @brief set property
147 gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
150 GstEdgeSrc *self = GST_EDGESRC (object);
154 nns_logw ("host property is deprecated");
157 nns_logw ("port property is deprecated");
160 gst_edgesrc_set_dest_host (self, g_value_get_string (value));
163 gst_edgesrc_set_dest_port (self, g_value_get_uint (value));
165 case PROP_CONNECT_TYPE:
166 gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
169 if (!g_value_get_string (value)) {
170 nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
173 g_free (self->topic);
174 self->topic = g_value_dup_string (value);
177 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
183 * @brief get property
186 gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
189 GstEdgeSrc *self = GST_EDGESRC (object);
193 nns_logw ("host property is deprecated");
196 nns_logw ("port property is deprecated");
199 g_value_set_string (value, gst_edgesrc_get_dest_host (self));
202 g_value_set_uint (value, gst_edgesrc_get_dest_port (self));
204 case PROP_CONNECT_TYPE:
205 g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
208 g_value_set_string (value, self->topic);
211 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
217 * @brief finalize the object
220 gst_edgesrc_class_finalize (GObject * object)
222 GstEdgeSrc *self = GST_EDGESRC (object);
223 nns_edge_data_h data_h;
225 g_free (self->dest_host);
226 self->dest_host = NULL;
228 g_free (self->topic);
231 if (self->msg_queue) {
232 while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
233 nns_edge_data_destroy (data_h);
235 g_async_queue_unref (self->msg_queue);
236 self->msg_queue = NULL;
240 nns_edge_release_handle (self->edge_h);
243 G_OBJECT_CLASS (parent_class)->finalize (object);
247 * @brief nnstreamer-edge event callback.
250 _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
252 nns_edge_event_e event_type;
253 int ret = NNS_EDGE_ERROR_NONE;
255 GstEdgeSrc *self = GST_EDGESRC (user_data);
256 if (0 != nns_edge_event_get_type (event_h, &event_type)) {
257 nns_loge ("Failed to get event type!");
258 return NNS_EDGE_ERROR_UNKNOWN;
261 switch (event_type) {
262 case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
264 nns_edge_data_h data;
266 nns_edge_event_parse_new_data (event_h, &data);
267 g_async_queue_push (self->msg_queue, data);
270 case NNS_EDGE_EVENT_CONNECTION_CLOSED:
272 nns_edge_disconnect (self->edge_h);
273 ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
274 if (NNS_EDGE_ERROR_NONE != ret) {
275 nns_edge_data_h data_h;
276 nns_edge_data_create (&data_h);
277 g_async_queue_push (self->msg_queue, data_h);
289 * @brief start edgesrc, called when state changed null to ready
292 gst_edgesrc_start (GstBaseSrc * basesrc)
294 GstEdgeSrc *self = GST_EDGESRC (basesrc);
300 nns_edge_create_handle (NULL, self->connect_type,
301 NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
303 if (NNS_EDGE_ERROR_NONE != ret) {
304 nns_loge ("Failed to get nnstreamer edge handle.");
307 nns_edge_release_handle (self->edge_h);
315 nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
316 if (self->dest_port > 0) {
317 port = g_strdup_printf ("%u", self->dest_port);
318 nns_edge_set_info (self->edge_h, "DEST_PORT", port);
322 nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
324 nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
326 if (0 != nns_edge_start (self->edge_h)) {
328 ("Failed to start NNStreamer-edge. Please check server IP and port");
332 if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
333 nns_loge ("Failed to connect to edge server!");
341 * @brief Create a buffer containing the subscribed data
344 gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
345 GstBuffer ** out_buf)
347 GstEdgeSrc *self = GST_EDGESRC (basesrc);
349 nns_edge_data_h data_h;
350 GstBuffer *buffer = NULL;
357 data_h = g_async_queue_pop (self->msg_queue);
360 nns_loge ("Failed to get message from the edgesrc message queue.");
364 ret = nns_edge_data_get_count (data_h, &num_data);
365 if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
366 nns_loge ("Failed to get the number of memories of the edge data.");
370 buffer = gst_buffer_new ();
371 for (i = 0; i < num_data; i++) {
373 nns_size_t data_len = 0;
376 nns_edge_data_get (data_h, i, &data, &data_len);
377 new_data = _g_memdup (data, data_len);
379 gst_buffer_append_memory (buffer,
380 gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
386 nns_edge_data_destroy (data_h);
388 if (buffer == NULL) {
389 nns_loge ("Failed to get buffer to push to the edgesrc.");
390 return GST_FLOW_ERROR;
399 * @brief getter for the 'host' property.
402 gst_edgesrc_get_dest_host (GstEdgeSrc * self)
404 return self->dest_host;
408 * @brief setter for the 'host' property.
411 gst_edgesrc_set_dest_host (GstEdgeSrc * self, const gchar * dest_host)
413 g_free (self->dest_host);
414 self->dest_host = g_strdup (dest_host);
418 * @brief getter for the 'port' property.
421 gst_edgesrc_get_dest_port (GstEdgeSrc * self)
423 return self->dest_port;
427 * @brief setter for the 'port' property.
430 gst_edgesrc_set_dest_port (GstEdgeSrc * self, const guint16 dest_port)
432 self->dest_port = dest_port;
436 * @brief getter for the 'connect_type' property.
438 static nns_edge_connect_type_e
439 gst_edgesrc_get_connect_type (GstEdgeSrc * self)
441 return self->connect_type;
445 * @brief setter for the 'connect_type' property.
448 gst_edgesrc_set_connect_type (GstEdgeSrc * self,
449 const nns_edge_connect_type_e connect_type)
451 self->connect_type = connect_type;