- Add new elements: edgesrc and edgesink.
- Modify Test for edgesrc and edgesink.
Signed-off-by: Yechan Choi <yechan9.choi@samsung.com>
Signed-off-by: gichan <gichan2.jang@samsung.com>
* @bug No known bugs
*
*/
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "edge_common.h"
+
+/**
+ * @brief register GEnumValue array for edge protocol property handling
+ */
+GType
+gst_edge_get_connect_type (void)
+{
+ static GType protocol = 0;
+ if (protocol == 0) {
+ static GEnumValue protocols[] = {
+ {NNS_EDGE_CONNECT_TYPE_TCP, "TCP",
+ "Directly sending stream frames via TCP connections."},
+ /** @todo support UDP, MQTT and HYBRID */
+ {0, NULL, NULL},
+ };
+ protocol = g_enum_register_static ("edge_protocol", protocols);
+ }
+
+ return protocol;
+}
#ifndef __GST_EDGE_H__
#define __GST_EDGE_H__
-G_BEGIN_DECLS
-#ifndef UNUSED
-#define UNUSED(expr) do { (void)(sizeof(x), 0); } while (0)
-#endif /* UNUSED */
+#include <glib.h>
+#include <gst/gst.h>
+#include "nnstreamer-edge.h"
+
#ifndef GST_EDGE_PACKAGE
#define GST_EDGE_PACKAGE "GStreamer Edge Plugins"
#endif /* GST_EDGE_PACKAGE */
#define GST_EDGE_ELEM_NAME_SINK "edgesink"
#define GST_EDGE_ELEM_NAME_SRC "edgesrc"
- G_END_DECLS
+#define DEFAULT_HOST "localhost"
+#define DEFAULT_PORT 3000
+#define DEFAULT_CONNECT_TYPE (NNS_EDGE_CONNECT_TYPE_TCP)
+#define GST_TYPE_EDGE_CONNECT_TYPE (gst_edge_get_connect_type ())
+
+G_BEGIN_DECLS
+/**
+ * @brief register GEnumValue array for edge protocol property handling
+ */
+ GType gst_edge_get_connect_type (void);
+
+G_END_DECLS
#endif /* __GST_EDGE_H__ */
/**
* Copyright (C) 2022 Samsung Electronics Co., Ltd.
*
- * @file edge_sink.h
+ * @file edge_elements.c
* @date 02 Aug 2022
* @brief Register edge plugins
* @author Yechan Choi <yechan9.choi@samsung.com>
*/
#include <gst/gst.h>
-#include "edge_common.h"
#include "edge_sink.h"
#include "edge_src.h"
/**
* Copyright (C) 2022 Samsung Electronics Co., Ltd.
*
- * @file edge_sink.h
+ * @file edge_sink.c
* @date 01 Aug 2022
* @brief Publish incoming streams
* @author Yechan Choi <yechan9.choi@samsung.com>
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
+/**
+ * @brief edgesink properties
+ */
enum
{
PROP_0,
- /** @todo define props */
+ PROP_HOST,
+ PROP_PORT,
+ PROP_CONNECT_TYPE,
PROP_LAST
};
G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
static void gst_edgesink_set_property (GObject * object,
- guint prop_id, const GValue * value, GParamSepc * pspec);
+ guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_edgesink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
static gboolean gst_edgesink_start (GstBaseSink * basesink);
static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
- GstBuffer * buf);
+ GstBuffer * buffer);
static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
+static gchar *gst_edgesink_get_host (GstEdgeSink * self);
+static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
+
+static guint16 gst_edgesink_get_port (GstEdgeSink * self);
+static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
+
+static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
+ self);
+static void gst_edgesink_set_connect_type (GstEdgeSink * self,
+ const nns_edge_connect_type_e connect_type);
+
/**
* @brief initialize the class
*/
gst_edgesink_class_init (GstEdgeSinkClass * klass)
{
GObjectClass *gobject_class;
- GstElementClass *gselement_class;
+ GstElementClass *gstelement_class;
GstBaseSinkClass *gstbasesink_class;
gstbasesink_class = (GstBaseSinkClass *) klass;
gobject_class->get_property = gst_edgesink_get_property;
gobject_class->finalize = gst_edgesink_finalize;
- /** @todo set props */
+ g_object_class_install_property (gobject_class, PROP_HOST,
+ g_param_spec_string ("host", "Host",
+ "A self host address to accept connection from edgesrc", DEFAULT_HOST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_PORT,
+ g_param_spec_uint ("port", "Port",
+ "A self port address to accept connection from edgesrc.",
+ 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
+ g_param_spec_enum ("connect-type", "Connect Type",
+ "The connections type between edgesink and edgesrc.",
+ GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
* @brief initialize the new element
*/
static void
-gst_edgesink_init (GstEdgeSink * sink)
+gst_edgesink_init (GstEdgeSink * self)
{
- /** @todo set default value of props */
-}
-
-
-/**
- * @brief finalize the object
- */
-static void
-gst_edgesink_finalize (GObject * object)
-{
- GstEdgeSink *self = GST_EDGESINK (object);
- /** @todo finalize - free all pointer in element */
- G_OBJECT_CLASS (parent_class)->finalize (object);
+ self->host = g_strdup (DEFAULT_HOST);
+ self->port = DEFAULT_PORT;
+ self->connect_type = DEFAULT_CONNECT_TYPE;
}
/**
GstEdgeSink *self = GST_EDGESINK (object);
switch (prop_id) {
- /** @todo set prop */
+ case PROP_HOST:
+ gst_edgesink_set_host (self, g_value_get_string (value));
+ break;
+ case PROP_PORT:
+ gst_edgesink_set_port (self, g_value_get_uint (value));
+ break;
+ case PROP_CONNECT_TYPE:
+ gst_edgesink_set_connect_type (self, g_value_get_enum (value));
+ break;
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
*/
static void
gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
- GParmSpec * pspec)
+ GParamSpec * pspec)
{
- GstEdgeSInk *self = GST_EDGESINK (object);
+ GstEdgeSink *self = GST_EDGESINK (object);
switch (prop_id) {
- /** @todo props */
+ case PROP_HOST:
+ g_value_set_string (value, gst_edgesink_get_host (self));
+ break;
+ case PROP_PORT:
+ g_value_set_uint (value, gst_edgesink_get_port (self));
+ break;
+ case PROP_CONNECT_TYPE:
+ g_value_set_enum (value, gst_edgesink_get_connect_type (self));
+ break;
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
/**
+ * @brief finalize the object
+ */
+static void
+gst_edgesink_finalize (GObject * object)
+{
+ GstEdgeSink *self = GST_EDGESINK (object);
+ if (self->host) {
+ g_free (self->host);
+ self->host = NULL;
+ }
+
+ if (self->edge_h) {
+ nns_edge_release_handle (self->edge_h);
+ self->edge_h = NULL;
+ }
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+/**
* @brief start processing of edgesink
*/
static gboolean
gst_edgesink_start (GstBaseSink * basesink)
{
- GstEdgeSink *sink = GST_EDGESINK (basesink);
+ GstEdgeSink *self = GST_EDGESINK (basesink);
+
+ int ret;
+ char *port = NULL;
+
+ ret =
+ nns_edge_create_handle ("TEMP_ID", self->connect_type,
+ NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_loge ("Failed to get nnstreamer edge handle.");
+
+ if (self->edge_h) {
+ nns_edge_release_handle (self->edge_h);
+ self->edge_h = NULL;
+ }
+
+ return FALSE;
+ }
+
+ nns_edge_set_info (self->edge_h, "HOST", self->host);
+ port = g_strdup_printf ("%d", self->port);
+ nns_edge_set_info (self->edge_h, "PORT", port);
+ g_free (port);
+
+ if (0 != nns_edge_start (self->edge_h)) {
+ nns_loge
+ ("Failed to start NNStreamer-edge. Please check server IP and port");
+ return FALSE;
+ }
- /** @todo start */
+ return TRUE;
}
/**
* @brief render buffer, send buffer
*/
static GstFlowReturn
-gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buf)
+gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
{
- /** @todo render, send data */
+ GstEdgeSink *self = GST_EDGESINK (basesink);
+ nns_edge_data_h data_h;
+ guint i, num_mems;
+ int ret;
+ GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+ GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
+
+ ret = nns_edge_data_create (&data_h);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_loge ("Failed to create data handle in edgesink");
+ return GST_FLOW_ERROR;
+ }
+
+ num_mems = gst_buffer_n_memory (buffer);
+ for (i = 0; i < num_mems; i++) {
+ mem[i] = gst_buffer_peek_memory (buffer, i);
+ if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+ nns_loge ("Cannot map the %uth memory in gst-buffer", i);
+ num_mems = i;
+ goto done;
+ }
+ nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
+ }
+
+ nns_edge_send (self->edge_h, data_h);
+ goto done;
+
+done:
+ if (data_h)
+ nns_edge_data_destroy (data_h);
+
+ for (i = 0; i < num_mems; i++) {
+ gst_memory_unmap (mem[i], &map[i]);
+ }
+
+ return GST_FLOW_OK;
}
/**
static gboolean
gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
{
- /** @todo set caps */
+ GstEdgeSink *sink = GST_EDGESINK (basesink);
+ gchar *caps_str, *prev_caps_str, *new_caps_str;
+ int set_rst;
+
+ caps_str = gst_caps_to_string (caps);
+
+ nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
+ if (!prev_caps_str) {
+ prev_caps_str = g_strdup ("");
+ }
+ new_caps_str =
+ g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
+ set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
+
+ g_free (prev_caps_str);
+ g_free (new_caps_str);
+ g_free (caps_str);
+
+ return set_rst == NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief getter for the 'host' property.
+ */
+static gchar *
+gst_edgesink_get_host (GstEdgeSink * self)
+{
+ return self->host;
+}
+
+/**
+ * @brief setter for the 'host' property.
+ */
+static void
+gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
+{
+ if (self->host)
+ g_free (self->host);
+ self->host = g_strdup (host);
+}
+
+/**
+ * @brief getter for the 'port' property.
+ */
+static guint16
+gst_edgesink_get_port (GstEdgeSink * self)
+{
+ return self->port;
+}
+
+/**
+ * @brief setter for the 'port' property.
+ */
+static void
+gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
+{
+ self->port = port;
+}
+
+/**
+ * @brief getter for the 'connect_type' property.
+ */
+static nns_edge_connect_type_e
+gst_edgesink_get_connect_type (GstEdgeSink * self)
+{
+ return self->connect_type;
+}
+
+/**
+ * @brief setter for the 'connect_type' property.
+ */
+static void
+gst_edgesink_set_connect_type (GstEdgeSink * self,
+ const nns_edge_connect_type_e connect_type)
+{
+ self->connect_type = connect_type;
}
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
+#include "edge_common.h"
+#include "nnstreamer-edge.h"
+#include "../nnstreamer/nnstreamer_log.h"
+#include "tensor_typedef.h"
G_BEGIN_DECLS
#define GST_TYPE_EDGESINK \
{
GstBaseSink element;
+ gchar *host;
+ guint16 port;
+
+ nns_edge_connect_type_e connect_type;
+ nns_edge_h edge_h;
};
/**
{
PROP_0,
- /** @todo define props */
+ PROP_DEST_HOST,
+ PROP_DEST_PORT,
+ PROP_CONNECT_TYPE,
PROP_LAST
-}
+};
#define gst_edgesrc_parent_class parent_class
-G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_PUSH_SRC);
+G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_BASE_SRC);
static void gst_edgesrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
GValue * value, GParamSpec * pspec);
static void gst_edgesrc_class_finalize (GObject * object);
-static GstStateChangeReturn gst_edgesrc_change_state (GstElement * element,
- GstStateChange transition);
-
static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
-static gboolean gst_edgesrc_stop (GstBaseSrc * basesrc);
-static GstCaps *gst_edgesrc_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
-static void gst_edgesrc_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
- GstClockTime * start, GstClockTime * end);
-static gboolean gst_edgesrc_is_seekable (GstBaseSrc * basesrc);
static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
- guint size, GstBuffer ** buf);
-static gboolean gst_edgesrc_query (GstBaseSrc * basesrc, GstQuery * query);
+ guint size, GstBuffer ** out_buf);
+
+static gchar *gst_edgesrc_get_dest_host (GstEdgeSrc * self);
+static void gst_edgesrc_set_dest_host (GstEdgeSrc * self,
+ const gchar * dest_host);
+
+static guint16 gst_edgesrc_get_dest_port (GstEdgeSrc * self);
+static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
+ const guint16 dest_port);
+
+static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
+static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
+ const nns_edge_connect_type_e connect_type);
/**
* @brief initialize the class
gobject_class->get_property = gst_edgesrc_get_property;
gobject_class->finalize = gst_edgesrc_class_finalize;
- /** @todo set props */
+ g_object_class_install_property (gobject_class, PROP_DEST_HOST,
+ g_param_spec_string ("dest-host", "Destination Host",
+ "A host address of edgesink to receive the packets from edgesink",
+ DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_DEST_PORT,
+ g_param_spec_uint ("dest-port", "Destination Port",
+ "A port of edgesink to receive the packets from edgesink",
+ 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
+ g_param_spec_enum ("connect-type", "Connect Type",
+ "The connections type between edgesink and edgesrc.",
+ GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
- gst_element_class_set_static_metadata (gelement_class,
+ gst_element_class_set_static_metadata (gstelement_class,
"EdgeSrc", "Source/Edge",
"Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");
- gstelement_class->change_state = gst_edgesrc_change_state;
-
gstbasesrc_class->start = gst_edgesrc_start;
- gstbasesrc_class->stop = gst_edgesrc_stop;
- gstbasesrc_class->get_caps = gst_edgesrc_get_caps;
- gstbasesrc_class->get_times = gst_edgesrc_get_times;
- gstbasesrc_class->is_seekable = gst_edgesrc_is_seekable;
gstbasesrc_class->create = gst_edgesrc_create;
- gstbasesrc_class->query = gst_edgesrc_query;
GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
gst_base_src_set_async (basesrc, FALSE);
- /** @todo set default value of props */
+ self->dest_host = g_strdup (DEFAULT_HOST);
+ self->dest_port = DEFAULT_PORT;
+ self->msg_queue = g_async_queue_new ();
+ self->connect_type = DEFAULT_CONNECT_TYPE;
}
/**
{
GstEdgeSrc *self = GST_EDGESRC (object);
- switch (prod_id) {
- /** @todo set prop */
+ switch (prop_id) {
+ case PROP_DEST_HOST:
+ gst_edgesrc_set_dest_host (self, g_value_get_string (value));
+ break;
+ case PROP_DEST_PORT:
+ gst_edgesrc_set_dest_port (self, g_value_get_uint (value));
+ break;
+ case PROP_CONNECT_TYPE:
+ gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
+ break;
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GstEdgeSrc *self = GST_EDGESRC (object);
switch (prop_id) {
- /** @todo props */
+ case PROP_DEST_HOST:
+ g_value_set_string (value, gst_edgesrc_get_dest_host (self));
+ break;
+ case PROP_DEST_PORT:
+ g_value_set_uint (value, gst_edgesrc_get_dest_port (self));
+ break;
+ case PROP_CONNECT_TYPE:
+ g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
+ break;
+
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
gst_edgesrc_class_finalize (GObject * object)
{
GstEdgeSrc *self = GST_EDGESRC (object);
- /** @todo finalize - free all pointer in element */
+ nns_edge_data_h data_h;
+
+ if (self->dest_host) {
+ g_free (self->dest_host);
+ self->dest_host = NULL;
+ }
+
+ if (self->msg_queue) {
+ while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
+ nns_edge_data_destroy (data_h);
+ }
+ g_async_queue_unref (self->msg_queue);
+ self->msg_queue = NULL;
+ }
+
+ if (self->edge_h) {
+ nns_edge_release_handle (self->edge_h);
+ self->edge_h = NULL;
+ }
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/**
- * @brief handle edgesrc's state change
+ * @brief nnstreamer-edge event callback.
*/
-static GstStateChangeReturn
-gst_edgesrc_change_state (GstElement * element, GstStateChange transition)
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
{
- GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+ nns_edge_event_e event_type;
+ int ret = NNS_EDGE_ERROR_NONE;
- GstEdgeSrc *self = GST_EDGESRC (element);
-
- /** @todo handle transition */
+ GstEdgeSrc *self = GST_EDGESRC (user_data);
+ if (0 != nns_edge_event_get_type (event_h, &event_type)) {
+ nns_loge ("Failed to get event type!");
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ switch (event_type) {
+ case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+ {
+ nns_edge_data_h data;
- /** @todo handle transition */
+ nns_edge_event_parse_new_data (event_h, &data);
+ g_async_queue_push (self->msg_queue, data);
+ break;
+ }
+ default:
+ break;
+ }
return ret;
}
{
GstEdgeSrc *self = GST_EDGESRC (basesrc);
- /** @todo start */
+ int ret;
+ char *port = NULL;
+
+ ret =
+ nns_edge_create_handle ("TEMP_ID", self->connect_type,
+ NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
+
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_loge ("Failed to get nnstreamer edge handle.");
+
+ if (self->edge_h) {
+ nns_edge_release_handle (self->edge_h);
+ self->edge_h = NULL;
+ }
+
+ return FALSE;
+ }
+
+ nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+ port = g_strdup_printf ("%d", self->dest_port);
+ nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+ g_free (port);
+
+ nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
+
+ if (0 != nns_edge_start (self->edge_h)) {
+ nns_loge
+ ("Failed to start NNStreamer-edge. Please check server IP and port");
+ return FALSE;
+ }
+
+ if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
+ nns_loge ("Failed to connect to edge server!");
+ return FALSE;
+ }
+
+ return TRUE;
}
/**
- * @brief stop edgesrc, called when state changed ready to null
+ * @brief Create a buffer containing the subscribed data
*/
-static gboolean
-gst_edgesrc_stop (GstBaseSrc * basesrc)
+static GstFlowReturn
+gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
+ GstBuffer ** out_buf)
{
GstEdgeSrc *self = GST_EDGESRC (basesrc);
- /** @todo stop */
+ nns_edge_data_h data_h;
+ GstBuffer *buffer = NULL;
+ guint i, num_data;
+ int ret;
+
+ UNUSED (offset);
+ UNUSED (size);
+
+ data_h = g_async_queue_pop (self->msg_queue);
+
+ if (!data_h) {
+ nns_loge ("Failed to get message from the edgesrc message queue.");
+ goto done;
+ }
+
+ ret = nns_edge_data_get_count (data_h, &num_data);
+ if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
+ nns_loge ("Failed to get the number of memories of the edge data.");
+ goto done;
+ }
+
+ buffer = gst_buffer_new ();
+ for (i = 0; i < num_data; i++) {
+ void *data = NULL;
+ size_t data_len = 0;
+ gpointer new_data;
+
+ nns_edge_data_get (data_h, i, &data, &data_len);
+ new_data = _g_memdup (data, data_len);
+
+ gst_buffer_append_memory (buffer,
+ gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
+ g_free));
+ }
+
+done:
+ if (data_h)
+ nns_edge_data_destroy (data_h);
+
+ if (buffer == NULL) {
+ nns_loge ("Failed to get buffer to push to the edgesrc.");
+ return GST_FLOW_ERROR;
+ }
+
+ *out_buf = buffer;
+
+ return GST_FLOW_OK;
}
/**
- * @brief Get caps of subclass
+ * @brief getter for the 'host' property.
*/
-static GstCaps *
-gst_edgesrc_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
+static gchar *
+gst_edgesrc_get_dest_host (GstEdgeSrc * self)
{
- /** @todo get caps */
+ return self->dest_host;
}
/**
- * @brief Return the time information of the given buffer
+ * @brief setter for the 'host' property.
*/
static void
-gst_edgesrc_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
- GstClockTime * start, GstClockTime * end)
+gst_edgesrc_set_dest_host (GstEdgeSrc * self, const gchar * dest_host)
{
- /** @todo get times */
+ g_free (self->dest_host);
+ self->dest_host = g_strdup (dest_host);
}
/**
- * @brief Check if source supports seeking
+ * @brief getter for the 'port' property.
*/
-static gboolean
-gst_edgesrc_is_seekable (GstBaseSrc * basesrc)
+static guint16
+gst_edgesrc_get_dest_port (GstEdgeSrc * self)
{
- /** @todo is seekable */
+ return self->dest_port;
}
/**
- * @brief Create a buffer containing the subscribed data
+ * @brief setter for the 'port' property.
*/
-static GstFlowReturn
-gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
- GstBuffer ** buf)
+static void
+gst_edgesrc_set_dest_port (GstEdgeSrc * self, const guint16 dest_port)
{
- GstEdgeSrc *self = GST_EDGESRC (basesrc);
+ self->dest_port = dest_port;
+}
- /** @todo create */
+/**
+ * @brief getter for the 'connect_type' property.
+ */
+static nns_edge_connect_type_e
+gst_edgesrc_get_connect_type (GstEdgeSrc * self)
+{
+ return self->connect_type;
}
/**
- * @brief An implementation of the GstBaseSrc vmethod that handles queries
+ * @brief setter for the 'connect_type' property.
*/
-static gboolean
-gst_edgesrc_query (GstBaseSrc * basesrc, GstQuery * query)
+static void
+gst_edgesrc_set_connect_type (GstEdgeSrc * self,
+ const nns_edge_connect_type_e connect_type)
{
- /** @todo query */
+ self->connect_type = connect_type;
}
#ifndef __GST_EDGE_SRC_H__
#define __GST_EDGE_SRC_H__
+#include <gst/gst.h>
#include <gst/base/gstbasesrc.h>
+#include "edge_common.h"
+#include "nnstreamer-edge.h"
+#include "nnstreamer_util.h"
+#include "../nnstreamer/nnstreamer_log.h"
G_BEGIN_DECLS
#define GST_TYPE_EDGESRC \
(G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_EDGESRC))
#define GST_EDGESRC_CAST(obj) ((GstEdgeSrc *) (obj))
typedef struct _GstEdgeSrc GstEdgeSrc;
-typedef struct _GstEdgeSrcClass GstEdgeSinkClass;
+typedef struct _GstEdgeSrcClass GstEdgeSrcClass;
/**
* @brief GstEdgeSrc data structure.
struct _GstEdgeSrc
{
GstBaseSrc element;
-}
+
+ gchar *dest_host;
+ guint16 dest_port;
+
+ nns_edge_connect_type_e connect_type;
+ nns_edge_h edge_h;
+ GAsyncQueue *msg_queue;
+};
/**
* @brief GstEdgeSrcClass data structure.
struct _GstEdgeSrcClass
{
GstBaseSrcClass parent_class;
-}
+};
GType gst_edgesrc_get_type (void);
edge_srcs = []
edge_dep = [
- glib_dep,
+ glib_dep,
gst_base_dep,
gst_dep,
+ nnstreamer_edge_support_deps
]
+if build_platform == 'tizen'
+ edge_dep += dlog_dep
+elif cc.has_header_symbol('android/log.h', '__android_log_print')
+ edge_dep += cc.find_library('log')
+endif
+
foreach s : edge_files
edge_srcs += join_paths(meson.current_source_dir(), s)
endforeach
-subdir('edge')
+if nnstreamer_edge_support_is_available
+ subdir('edge')
+endif
subdir('join')
if mqtt_support_is_available
subdir('mqtt')
# log utils
# TODO: Define proper variable for android in entire project
if build_platform == 'tizen'
- nnstreamer_single_deps += dependency('dlog')
+ nnstreamer_single_deps += dlog_dep
elif cc.has_header_symbol('android/log.h', '__android_log_print')
nnstreamer_single_deps += cc.find_library('log')
endif
tizenVmajor = get_option('tizen-version-major')
add_project_arguments('-DTIZENVERSION='+tizenVmajor.to_string(), language: ['c', 'cpp'])
+ dlog_dep = dependency('dlog')
elif not meson.is_cross_build()
if cc.get_id() == 'clang' and cxx.get_id() == 'clang'
if build_machine.system() == 'darwin'
%{_prefix}/lib/nnstreamer/decoders/libnnstreamer_decoder_octet_stream.so
%{_prefix}/lib/nnstreamer/filters/libnnstreamer_filter_cpp.so
%{gstlibdir}/libnnstreamer.so
+%{gstlibdir}/libgstedge.so
%{_libdir}/libnnstreamer.so
%files single
callCompareTest $1 $2 $3 "$4" $5 $6
}
+## @brief Find the number of the first file that matches
+## @param $1 Raw file prefix
+## @param $2 Raw file suffix
+## @param $3 Result file name
+## @param $4 Test Case ID
+function findFirstMatchedFileNumber() {
+ num=-1
+ result_file="$3"
+
+ if [[ ! -f "$result_file" ]]; then
+ echo "$3 don't exist."
+ return
+ fi
+
+ command -v cmp
+
+ if [[ $? == 0 && ! -L $(which cmp) ]]; then
+ cmp_exist=1
+ else
+ cmp_exist=0
+ fi
+
+ while :
+ do
+ num=$((num+1))
+ raw_file="$1$num$2"
+
+ if [[ ! -f "$raw_file" ]]; then
+ echo "$raw_file don't exist."
+ num=-1
+ return
+ fi
+
+ if [[ "$cmp_exist" -eq "1" ]]; then
+ # use cmp - callCompareTest is too verbose
+ cmp $raw_file $result_file
+ output=$?
+ if [[ "$output" -eq "1" ]]; then
+ output=0
+ else
+ output=1
+ fi
+ else
+ # use `callCompareTest` if cmp is not exist
+ callCompareTest $raw_file $result_file "$4-$num" "find the number of the first file that matches for test $4" 0 1
+ fi
+
+ if [[ "$output" -eq "1" ]]; then
+ return
+ fi
+ done
+}
+
# Run edge sink server as echo server with default address option.
PORT=`python3 ../../get_available_port.py`
gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
t. ! queue ! multifilesink location=raw1_%1d.log \
t. ! queue ! edgesink port=${PORT} async=false" 1-1 0 0 30
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result1_%1d.log" 1-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw1_0.log result1_0.log 1-3 "Compare 1-3" 1 0
-callCompareTestIfExist raw1_1.log result1_1.log 1-4 "Compare 1-4" 1 0
-callCompareTestIfExist raw1_2.log result1_2.log 1-5 "Compare 1-5" 1 0
-callCompareTestIfExist raw1_3.log result1_3.log 1-6 "Compare 1-6" 1 0
-callCompareTestIfExist raw1_4.log result1_4.log 1-7 "Compare 1-7" 1 0
-callCompareTestIfExist raw1_5.log result1_5.log 1-8 "Compare 1-8" 1 0
-callCompareTestIfExist raw1_6.log result1_6.log 1-9 "Compare 1-9" 1 0
-callCompareTestIfExist raw1_7.log result1_7.log 1-10 "Compare 1-10" 1 0
-callCompareTestIfExist raw1_8.log result1_8.log 1-11 "Compare 1-11" 1 0
-callCompareTestIfExist raw1_9.log result1_9.log 1-12 "Compare 1-12" 1 0
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result1_%1d.log" 1-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw1_" ".log" "result1_0.log" 1-3
+callCompareTestIfExist raw1_$((num+0)).log result1_0.log 1-3 "Compare 1-3" 1 0
+callCompareTestIfExist raw1_$((num+1)).log result1_1.log 1-4 "Compare 1-4" 1 0
+callCompareTestIfExist raw1_$((num+2)).log result1_2.log 1-5 "Compare 1-5" 1 0
+callCompareTestIfExist raw1_$((num+3)).log result1_3.log 1-6 "Compare 1-6" 1 0
+callCompareTestIfExist raw1_$((num+4)).log result1_4.log 1-7 "Compare 1-7" 1 0
+callCompareTestIfExist raw1_$((num+5)).log result1_5.log 1-8 "Compare 1-8" 1 0
+callCompareTestIfExist raw1_$((num+6)).log result1_6.log 1-9 "Compare 1-9" 1 0
+callCompareTestIfExist raw1_$((num+7)).log result1_7.log 1-10 "Compare 1-10" 1 0
+callCompareTestIfExist raw1_$((num+8)).log result1_8.log 1-11 "Compare 1-11" 1 0
+callCompareTestIfExist raw1_$((num+9)).log result1_9.log 1-12 "Compare 1-12" 1 0
kill -9 $pid &> /dev/null
wait $pid
t. ! queue ! multifilesink location=raw2_%1d.log \
t. ! queue ! edgesink port=${PORT} async=false" 2-1 0 0 30
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result2_0_%1d.log" 2-2 0 0 $PERFORMANCE
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result2_0_%1d.log" 2-2 0 0 $PERFORMANCE
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result2_1_%1d.log" 2-3 0 0 $PERFORMANCE
-callCompareTestIfExist raw2_0.log result2_0_0.log 2-4 "Compare 2-4" 1 0
-callCompareTestIfExist raw2_0.log result2_1_0.log 2-5 "Compare 2-5" 1 0
-callCompareTestIfExist raw2_1.log result2_0_1.log 2-6 "Compare 2-6" 1 0
-callCompareTestIfExist raw2_1.log result2_1_1.log 2-7 "Compare 2-7" 1 0
-callCompareTestIfExist raw2_2.log result2_0_2.log 2-8 "Compare 2-8" 1 0
-callCompareTestIfExist raw2_2.log result2_1_2.log 2-9 "Compare 2-9" 1 0
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result2_1_%1d.log" 2-3 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw2_" ".log" "result2_0_0.log" 2-4
+callCompareTestIfExist raw2_$((num+0)).log result2_0_0.log 2-4 "Compare 2-4" 1 0
+callCompareTestIfExist raw2_$((num+1)).log result2_0_1.log 2-5 "Compare 2-5" 1 0
+callCompareTestIfExist raw2_$((num+2)).log result2_0_2.log 2-6 "Compare 2-6" 1 0
+findFirstMatchedFileNumber "raw2_" ".log" "result2_1_0.log" 2-7
+callCompareTestIfExist raw2_$((num+0)).log result2_1_0.log 2-7 "Compare 2-7" 1 0
+callCompareTestIfExist raw2_$((num+1)).log result2_1_1.log 2-8 "Compare 2-8" 1 0
+callCompareTestIfExist raw2_$((num+2)).log result2_1_2.log 2-9 "Compare 2-9" 1 0
kill -9 $pid &> /dev/null
wait $pid
t. ! queue ! multifilesink location=raw3_%1d.log \
t. ! queue ! edgesink port=${PORT} async=false" 3-1 0 0 30
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result3_%1d.log" 3-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw3_0.log result3_0.log 3-3 "Compare 3-3" 1 0
-callCompareTestIfExist raw3_1.log result3_1.log 3-4 "Compare 3-4" 1 0
-callCompareTestIfExist raw3_2.log result3_2.log 3-5 "Compare 3-5" 1 0
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result3_%1d.log" 3-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw3_" ".log" "result3_0.log" 3-3
+callCompareTestIfExist raw3_$((num+0)).log result3_0.log 3-3 "Compare 3-3" 1 0
+callCompareTestIfExist raw3_$((num+1)).log result3_1.log 3-4 "Compare 3-4" 1 0
+callCompareTestIfExist raw3_$((num+2)).log result3_2.log 3-5 "Compare 3-5" 1 0
kill -9 $pid &> /dev/null
wait $pid
gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
videotestsrc is-live=true ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,num_tensors=1,dimensions=3:300:300:1,types=uint8,format=static ! tee name=t \
t. ! queue ! multifilesink location=raw4_%1d.log \
- t. ! queue ! edgesink port=${PORT} asnyc=false" 4-1 0 0 30
+ t. ! queue ! edgesink port=${PORT} async=false" 4-1 0 0 30
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result4_%1d.log" 4-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw4_0.log result4_0.log 4-3 "Compare 4-3" 1 0
-callCompareTestIfExist raw4_1.log result4_1.log 4-4 "Compare 4-4" 1 0
-callCompareTestIfExist raw4_2.log result4_2.log 4-5 "Compare 4-5" 1 0
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result4_%1d.log" 4-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw4_" ".log" "result4_0.log" 4-3
+callCompareTestIfExist raw4_$((num+0)).log result4_0.log 4-3 "Compare 4-3" 1 0
+callCompareTestIfExist raw4_$((num+1)).log result4_1.log 4-4 "Compare 4-4" 1 0
+callCompareTestIfExist raw4_$((num+2)).log result4_2.log 4-5 "Compare 4-5" 1 0
kill -9 $pid &> /dev/null
wait $pid
t. ! queue ! multifilesink location=raw5_%1d.log \
t. ! queue ! edgesink port=${PORT} async=false" 5-1 0 0 30
gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result5_%1d.log" 5-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw5_0.log result5_0.log 5-3 "Compare 5-3" 1 0
-callCompareTestIfExist raw5_1.log result5_1.log 5-4 "Compare 5-4" 1 0
-callCompareTestIfExist raw5_2.log result5_2.log 5-5 "Compare 5-5" 1 0
-kill -9 $pid &> /dev/null
-wait $pid
-
-# Cap: Text
-PORT=`python3 ../../get_available_port.py`
-RANDOM_TEXT="test.txt"
-python3 generate_random_text.py "--file_name ${RANDOM_TEXT}"
-gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
- filesrc location=${RANDOM_TEXT} ! text/x-raw,format=utf8 ! tee name=t \
- t. ! queue ! multifilesink location=raw6_%1d.log \
- t. ! queue ! edgesink port=${PORT} async=false" 6-1 0 0 30
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
- edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result6_%1d.log" 6-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw6_0.log result6_0.log 6-3 "Compare 6-3" 1 0
-callCompareTestIfExist raw6_1.log result6_1.log 6-4 "Compare 6-4" 1 0
-callCompareTestIfExist raw6_2.log result6_2.log 6-5 "Compare 6-5" 1 0
+ edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result5_%1d.log" 5-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw5_" ".log" "result5_0.log" 5-3
+callCompareTestIfExist raw5_$((num+0)).log result5_0.log 5-3 "Compare 5-3" 1 0
+callCompareTestIfExist raw5_$((num+1)).log result5_1.log 5-4 "Compare 5-4" 1 0
+callCompareTestIfExist raw5_$((num+2)).log result5_2.log 5-5 "Compare 5-5" 1 0
kill -9 $pid &> /dev/null
wait $pid
-rm ${RANDOM_TEXT}
rm *.log
report