[Gst/Edge] Add edgesrc and edgesink
authorYechan Choi <yechan9.choi@samsung.com>
Thu, 11 Aug 2022 10:18:21 +0000 (19:18 +0900)
committerSangjung Woo <again4you@gmail.com>
Thu, 22 Sep 2022 01:54:58 +0000 (10:54 +0900)
- Add new elements: edgesrc and edgesink.
- Modify Test for edgesrc and edgesink.

Signed-off-by: Yechan Choi <yechan9.choi@samsung.com>
Signed-off-by: gichan <gichan2.jang@samsung.com>
13 files changed:
gst/edge/edge_common.c
gst/edge/edge_common.h
gst/edge/edge_elements.c
gst/edge/edge_sink.c
gst/edge/edge_sink.h
gst/edge/edge_src.c
gst/edge/edge_src.h
gst/edge/meson.build
gst/meson.build
gst/nnstreamer/meson.build
meson.build
packaging/nnstreamer.spec
tests/nnstreamer_edge/edge/runTest.sh

index b4d9a5e..63115d9 100644 (file)
  * @bug     No known bugs
  *
  */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "edge_common.h"
+
+/**
+ * @brief register GEnumValue array for edge protocol property handling
+ */
+GType
+gst_edge_get_connect_type (void)
+{
+  static GType protocol = 0;
+  if (protocol == 0) {
+    static GEnumValue protocols[] = {
+      {NNS_EDGE_CONNECT_TYPE_TCP, "TCP",
+          "Directly sending stream frames via TCP connections."},
+          /** @todo support UDP, MQTT and HYBRID */
+      {0, NULL, NULL},
+    };
+    protocol = g_enum_register_static ("edge_protocol", protocols);
+  }
+
+  return protocol;
+}
index 1fb8385..ff64038 100644 (file)
 #ifndef __GST_EDGE_H__
 #define __GST_EDGE_H__
 
-G_BEGIN_DECLS
-#ifndef UNUSED
-#define UNUSED(expr) do { (void)(sizeof(x), 0); } while (0)
-#endif /* UNUSED */
+#include <glib.h>
+#include <gst/gst.h>
+#include "nnstreamer-edge.h"
+
 #ifndef GST_EDGE_PACKAGE
 #define GST_EDGE_PACKAGE "GStreamer Edge Plugins"
 #endif /* GST_EDGE_PACKAGE */
 #define GST_EDGE_ELEM_NAME_SINK "edgesink"
 #define GST_EDGE_ELEM_NAME_SRC "edgesrc"
-    G_END_DECLS
+#define DEFAULT_HOST "localhost"
+#define DEFAULT_PORT 3000
+#define DEFAULT_CONNECT_TYPE (NNS_EDGE_CONNECT_TYPE_TCP)
+#define GST_TYPE_EDGE_CONNECT_TYPE (gst_edge_get_connect_type ())
+
+G_BEGIN_DECLS
+/**
+ * @brief register GEnumValue array for edge protocol property handling
+ */
+    GType gst_edge_get_connect_type (void);
+
+G_END_DECLS
 #endif /* __GST_EDGE_H__ */
index eb995c2..b34aa05 100644 (file)
@@ -2,7 +2,7 @@
 /**
  * Copyright (C) 2022 Samsung Electronics Co., Ltd.
  *
- * @file    edge_sink.h
+ * @file    edge_elements.c
  * @date    02 Aug 2022
  * @brief   Register edge plugins
  * @author  Yechan Choi <yechan9.choi@samsung.com>
@@ -12,7 +12,6 @@
  */
 #include <gst/gst.h>
 
-#include "edge_common.h"
 #include "edge_sink.h"
 #include "edge_src.h"
 
index 5525ea5..c94968f 100644 (file)
@@ -2,7 +2,7 @@
 /**
  * Copyright (C) 2022 Samsung Electronics Co., Ltd.
  *
- * @file    edge_sink.h
+ * @file    edge_sink.c
  * @date    01 Aug 2022
  * @brief   Publish incoming streams
  * @author  Yechan Choi <yechan9.choi@samsung.com>
@@ -27,11 +27,16 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_ALWAYS,
     GST_STATIC_CAPS_ANY);
 
+/**
+ * @brief edgesink properties
+ */
 enum
 {
   PROP_0,
 
-  /** @todo define props */
+  PROP_HOST,
+  PROP_PORT,
+  PROP_CONNECT_TYPE,
 
   PROP_LAST
 };
@@ -40,7 +45,7 @@ enum
 G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
 
 static void gst_edgesink_set_property (GObject * object,
-    guint prop_id, const GValue * value, GParamSepc * pspec);
+    guint prop_id, const GValue * value, GParamSpec * pspec);
 
 static void gst_edgesink_get_property (GObject * object,
     guint prop_id, GValue * value, GParamSpec * pspec);
@@ -49,9 +54,20 @@ static void gst_edgesink_finalize (GObject * object);
 
 static gboolean gst_edgesink_start (GstBaseSink * basesink);
 static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
-    GstBuffer * buf);
+    GstBuffer * buffer);
 static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
 
+static gchar *gst_edgesink_get_host (GstEdgeSink * self);
+static void gst_edgesink_set_host (GstEdgeSink * self, const gchar * host);
+
+static guint16 gst_edgesink_get_port (GstEdgeSink * self);
+static void gst_edgesink_set_port (GstEdgeSink * self, const guint16 port);
+
+static nns_edge_connect_type_e gst_edgesink_get_connect_type (GstEdgeSink *
+    self);
+static void gst_edgesink_set_connect_type (GstEdgeSink * self,
+    const nns_edge_connect_type_e connect_type);
+
 /**
  * @brief initialize the class
  */
@@ -59,7 +75,7 @@ static void
 gst_edgesink_class_init (GstEdgeSinkClass * klass)
 {
   GObjectClass *gobject_class;
-  GstElementClass *gselement_class;
+  GstElementClass *gstelement_class;
   GstBaseSinkClass *gstbasesink_class;
 
   gstbasesink_class = (GstBaseSinkClass *) klass;
@@ -70,7 +86,19 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
   gobject_class->get_property = gst_edgesink_get_property;
   gobject_class->finalize = gst_edgesink_finalize;
 
-  /** @todo set props */
+  g_object_class_install_property (gobject_class, PROP_HOST,
+      g_param_spec_string ("host", "Host",
+          "A self host address to accept connection from edgesrc", DEFAULT_HOST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_PORT,
+      g_param_spec_uint ("port", "Port",
+          "A self port address to accept connection from edgesrc.",
+          0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
+      g_param_spec_enum ("connect-type", "Connect Type",
+          "The connections type between edgesink and edgesrc.",
+          GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
@@ -91,21 +119,11 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
  * @brief initialize the new element
  */
 static void
-gst_edgesink_init (GstEdgeSink * sink)
+gst_edgesink_init (GstEdgeSink * self)
 {
-  /** @todo set default value of props */
-}
-
-
-/**
- * @brief finalize the object
- */
-static void
-gst_edgesink_finalize (GObject * object)
-{
-  GstEdgeSink *self = GST_EDGESINK (object);
-  /** @todo finalize - free all pointer in element */
-  G_OBJECT_CLASS (parent_class)->finalize (object);
+  self->host = g_strdup (DEFAULT_HOST);
+  self->port = DEFAULT_PORT;
+  self->connect_type = DEFAULT_CONNECT_TYPE;
 }
 
 /**
@@ -118,7 +136,16 @@ gst_edgesink_set_property (GObject * object, guint prop_id,
   GstEdgeSink *self = GST_EDGESINK (object);
 
   switch (prop_id) {
-    /** @todo set prop */
+    case PROP_HOST:
+      gst_edgesink_set_host (self, g_value_get_string (value));
+      break;
+    case PROP_PORT:
+      gst_edgesink_set_port (self, g_value_get_uint (value));
+      break;
+    case PROP_CONNECT_TYPE:
+      gst_edgesink_set_connect_type (self, g_value_get_enum (value));
+      break;
+
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -130,12 +157,21 @@ gst_edgesink_set_property (GObject * object, guint prop_id,
  */
 static void
 gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
-    GParmSpec * pspec)
+    GParamSpec * pspec)
 {
-  GstEdgeSInk *self = GST_EDGESINK (object);
+  GstEdgeSink *self = GST_EDGESINK (object);
 
   switch (prop_id) {
-    /** @todo props */
+    case PROP_HOST:
+      g_value_set_string (value, gst_edgesink_get_host (self));
+      break;
+    case PROP_PORT:
+      g_value_set_uint (value, gst_edgesink_get_port (self));
+      break;
+    case PROP_CONNECT_TYPE:
+      g_value_set_enum (value, gst_edgesink_get_connect_type (self));
+      break;
+
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -143,23 +179,107 @@ gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
 }
 
 /**
+ * @brief finalize the object
+ */
+static void
+gst_edgesink_finalize (GObject * object)
+{
+  GstEdgeSink *self = GST_EDGESINK (object);
+  if (self->host) {
+    g_free (self->host);
+    self->host = NULL;
+  }
+
+  if (self->edge_h) {
+    nns_edge_release_handle (self->edge_h);
+    self->edge_h = NULL;
+  }
+
+  G_OBJECT_CLASS (parent_class)->finalize (object);
+}
+
+/**
  * @brief start processing of edgesink
  */
 static gboolean
 gst_edgesink_start (GstBaseSink * basesink)
 {
-  GstEdgeSink *sink = GST_EDGESINK (basesink);
+  GstEdgeSink *self = GST_EDGESINK (basesink);
+
+  int ret;
+  char *port = NULL;
+
+  ret =
+      nns_edge_create_handle ("TEMP_ID", self->connect_type,
+      NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
+
+  if (NNS_EDGE_ERROR_NONE != ret) {
+    nns_loge ("Failed to get nnstreamer edge handle.");
+
+    if (self->edge_h) {
+      nns_edge_release_handle (self->edge_h);
+      self->edge_h = NULL;
+    }
+
+    return FALSE;
+  }
+
+  nns_edge_set_info (self->edge_h, "HOST", self->host);
+  port = g_strdup_printf ("%d", self->port);
+  nns_edge_set_info (self->edge_h, "PORT", port);
+  g_free (port);
+
+  if (0 != nns_edge_start (self->edge_h)) {
+    nns_loge
+        ("Failed to start NNStreamer-edge. Please check server IP and port");
+    return FALSE;
+  }
 
-  /** @todo start */
+  return TRUE;
 }
 
 /**
  * @brief render buffer, send buffer
  */
 static GstFlowReturn
-gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buf)
+gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buffer)
 {
-  /** @todo render, send data */
+  GstEdgeSink *self = GST_EDGESINK (basesink);
+  nns_edge_data_h data_h;
+  guint i, num_mems;
+  int ret;
+  GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
+  GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
+
+  ret = nns_edge_data_create (&data_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_loge ("Failed to create data handle in edgesink");
+    return GST_FLOW_ERROR;
+  }
+
+  num_mems = gst_buffer_n_memory (buffer);
+  for (i = 0; i < num_mems; i++) {
+    mem[i] = gst_buffer_peek_memory (buffer, i);
+    if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
+      nns_loge ("Cannot map the %uth memory in gst-buffer", i);
+      num_mems = i;
+      goto done;
+    }
+    nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
+  }
+
+  nns_edge_send (self->edge_h, data_h);
+  goto done;
+
+done:
+  if (data_h)
+    nns_edge_data_destroy (data_h);
+
+  for (i = 0; i < num_mems; i++) {
+    gst_memory_unmap (mem[i], &map[i]);
+  }
+
+  return GST_FLOW_OK;
 }
 
 /**
@@ -168,5 +288,80 @@ gst_edgesink_render (GstBaseSink * basesink, GstBuffer * buf)
 static gboolean
 gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps)
 {
-  /** @todo set caps */
+  GstEdgeSink *sink = GST_EDGESINK (basesink);
+  gchar *caps_str, *prev_caps_str, *new_caps_str;
+  int set_rst;
+
+  caps_str = gst_caps_to_string (caps);
+
+  nns_edge_get_info (sink->edge_h, "CAPS", &prev_caps_str);
+  if (!prev_caps_str) {
+    prev_caps_str = g_strdup ("");
+  }
+  new_caps_str =
+      g_strdup_printf ("%s@edge_sink_caps@%s", prev_caps_str, caps_str);
+  set_rst = nns_edge_set_info (sink->edge_h, "CAPS", new_caps_str);
+
+  g_free (prev_caps_str);
+  g_free (new_caps_str);
+  g_free (caps_str);
+
+  return set_rst == NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief getter for the 'host' property.
+ */
+static gchar *
+gst_edgesink_get_host (GstEdgeSink * self)
+{
+  return self->host;
+}
+
+/**
+ * @brief setter for the 'host' property.
+ */
+static void
+gst_edgesink_set_host (GstEdgeSink * self, const gchar * host)
+{
+  if (self->host)
+    g_free (self->host);
+  self->host = g_strdup (host);
+}
+
+/**
+ * @brief getter for the 'port' property.
+ */
+static guint16
+gst_edgesink_get_port (GstEdgeSink * self)
+{
+  return self->port;
+}
+
+/**
+ * @brief setter for the 'port' property.
+ */
+static void
+gst_edgesink_set_port (GstEdgeSink * self, const guint16 port)
+{
+  self->port = port;
+}
+
+/**
+ * @brief getter for the 'connect_type' property.
+ */
+static nns_edge_connect_type_e
+gst_edgesink_get_connect_type (GstEdgeSink * self)
+{
+  return self->connect_type;
+}
+
+/**
+ * @brief setter for the 'connect_type' property.
+ */
+static void
+gst_edgesink_set_connect_type (GstEdgeSink * self,
+    const nns_edge_connect_type_e connect_type)
+{
+  self->connect_type = connect_type;
 }
index c8c453c..110ae70 100644 (file)
 
 #include <gst/gst.h>
 #include <gst/base/gstbasesink.h>
+#include "edge_common.h"
+#include "nnstreamer-edge.h"
+#include "../nnstreamer/nnstreamer_log.h"
+#include "tensor_typedef.h"
 
 G_BEGIN_DECLS
 #define GST_TYPE_EDGESINK \
@@ -38,6 +42,11 @@ struct _GstEdgeSink
 {
   GstBaseSink element;
 
+  gchar *host;
+  guint16 port;
+
+  nns_edge_connect_type_e connect_type;
+  nns_edge_h edge_h;
 };
 
 /**
index a9e70f5..02fa68c 100644 (file)
@@ -32,13 +32,15 @@ enum
 {
   PROP_0,
 
-  /** @todo define props */
+  PROP_DEST_HOST,
+  PROP_DEST_PORT,
+  PROP_CONNECT_TYPE,
 
   PROP_LAST
-}
+};
 
 #define gst_edgesrc_parent_class parent_class
-G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_PUSH_SRC);
+G_DEFINE_TYPE (GstEdgeSrc, gst_edgesrc, GST_TYPE_BASE_SRC);
 
 static void gst_edgesrc_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -46,18 +48,21 @@ static void gst_edgesrc_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 static void gst_edgesrc_class_finalize (GObject * object);
 
-static GstStateChangeReturn gst_edgesrc_change_state (GstElement * element,
-    GstStateChange transition);
-
 static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
-static gboolean gst_edgesrc_stop (GstBaseSrc * basesrc);
-static GstCaps *gst_edgesrc_get_caps (GstBaseSrc * basesrc, GstCaps * filter);
-static void gst_edgesrc_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
-    GstClockTime * start, GstClockTime * end);
-static gboolean gst_edgesrc_is_seekable (GstBaseSrc * basesrc);
 static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
-    guint size, GstBuffer ** buf);
-static gboolean gst_edgesrc_query (GstBaseSrc * basesrc, GstQuery * query);
+    guint size, GstBuffer ** out_buf);
+
+static gchar *gst_edgesrc_get_dest_host (GstEdgeSrc * self);
+static void gst_edgesrc_set_dest_host (GstEdgeSrc * self,
+    const gchar * dest_host);
+
+static guint16 gst_edgesrc_get_dest_port (GstEdgeSrc * self);
+static void gst_edgesrc_set_dest_port (GstEdgeSrc * self,
+    const guint16 dest_port);
+
+static nns_edge_connect_type_e gst_edgesrc_get_connect_type (GstEdgeSrc * self);
+static void gst_edgesrc_set_connect_type (GstEdgeSrc * self,
+    const nns_edge_connect_type_e connect_type);
 
 /**
  * @brief initialize the class
@@ -73,24 +78,29 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)
   gobject_class->get_property = gst_edgesrc_get_property;
   gobject_class->finalize = gst_edgesrc_class_finalize;
 
-  /** @todo set props */
+  g_object_class_install_property (gobject_class, PROP_DEST_HOST,
+      g_param_spec_string ("dest-host", "Destination Host",
+          "A host address of edgesink to receive the packets from edgesink",
+          DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_DEST_PORT,
+      g_param_spec_uint ("dest-port", "Destination Port",
+          "A port of edgesink to receive the packets from edgesink",
+          0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
+      g_param_spec_enum ("connect-type", "Connect Type",
+          "The connections type between edgesink and edgesrc.",
+          GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&srctemplate));
 
-  gst_element_class_set_static_metadata (gelement_class,
+  gst_element_class_set_static_metadata (gstelement_class,
       "EdgeSrc", "Source/Edge",
       "Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");
 
-  gstelement_class->change_state = gst_edgesrc_change_state;
-
   gstbasesrc_class->start = gst_edgesrc_start;
-  gstbasesrc_class->stop = gst_edgesrc_stop;
-  gstbasesrc_class->get_caps = gst_edgesrc_get_caps;
-  gstbasesrc_class->get_times = gst_edgesrc_get_times;
-  gstbasesrc_class->is_seekable = gst_edgesrc_is_seekable;
   gstbasesrc_class->create = gst_edgesrc_create;
-  gstbasesrc_class->query = gst_edgesrc_query;
 
   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT,
       GST_EDGE_ELEM_NAME_SRC, 0, "Edge src");
@@ -107,7 +117,10 @@ gst_edgesrc_init (GstEdgeSrc * self)
   gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
   gst_base_src_set_async (basesrc, FALSE);
 
-  /** @todo set default value of props */
+  self->dest_host = g_strdup (DEFAULT_HOST);
+  self->dest_port = DEFAULT_PORT;
+  self->msg_queue = g_async_queue_new ();
+  self->connect_type = DEFAULT_CONNECT_TYPE;
 }
 
 /**
@@ -119,8 +132,17 @@ gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
 {
   GstEdgeSrc *self = GST_EDGESRC (object);
 
-  switch (prod_id) {
-      /** @todo set prop */
+  switch (prop_id) {
+    case PROP_DEST_HOST:
+      gst_edgesrc_set_dest_host (self, g_value_get_string (value));
+      break;
+    case PROP_DEST_PORT:
+      gst_edgesrc_set_dest_port (self, g_value_get_uint (value));
+      break;
+    case PROP_CONNECT_TYPE:
+      gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
+      break;
+
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -137,7 +159,16 @@ gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
   GstEdgeSrc *self = GST_EDGESRC (object);
 
   switch (prop_id) {
-      /** @todo props */
+    case PROP_DEST_HOST:
+      g_value_set_string (value, gst_edgesrc_get_dest_host (self));
+      break;
+    case PROP_DEST_PORT:
+      g_value_set_uint (value, gst_edgesrc_get_dest_port (self));
+      break;
+    case PROP_CONNECT_TYPE:
+      g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
+      break;
+
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -151,25 +182,55 @@ static void
 gst_edgesrc_class_finalize (GObject * object)
 {
   GstEdgeSrc *self = GST_EDGESRC (object);
-  /** @todo finalize - free all pointer in element */
+  nns_edge_data_h data_h;
+
+  if (self->dest_host) {
+    g_free (self->dest_host);
+    self->dest_host = NULL;
+  }
+
+  if (self->msg_queue) {
+    while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
+      nns_edge_data_destroy (data_h);
+    }
+    g_async_queue_unref (self->msg_queue);
+    self->msg_queue = NULL;
+  }
+
+  if (self->edge_h) {
+    nns_edge_release_handle (self->edge_h);
+    self->edge_h = NULL;
+  }
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
 /**
- * @brief handle edgesrc's state change
+ * @brief nnstreamer-edge event callback.
  */
-static GstStateChangeReturn
-gst_edgesrc_change_state (GstElement * element, GstStateChange transition)
+static int
+_nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
 {
-  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+  nns_edge_event_e event_type;
+  int ret = NNS_EDGE_ERROR_NONE;
 
-  GstEdgeSrc *self = GST_EDGESRC (element);
-
-  /** @todo handle transition  */
+  GstEdgeSrc *self = GST_EDGESRC (user_data);
+  if (0 != nns_edge_event_get_type (event_h, &event_type)) {
+    nns_loge ("Failed to get event type!");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+  switch (event_type) {
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+    {
+      nns_edge_data_h data;
 
-  /** @todo handle transition */
+      nns_edge_event_parse_new_data (event_h, &data);
+      g_async_queue_push (self->msg_queue, data);
+      break;
+    }
+    default:
+      break;
+  }
 
   return ret;
 }
@@ -182,65 +243,155 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
 {
   GstEdgeSrc *self = GST_EDGESRC (basesrc);
 
-  /** @todo start */
+  int ret;
+  char *port = NULL;
+
+  ret =
+      nns_edge_create_handle ("TEMP_ID", self->connect_type,
+      NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
+
+  if (NNS_EDGE_ERROR_NONE != ret) {
+    nns_loge ("Failed to get nnstreamer edge handle.");
+
+    if (self->edge_h) {
+      nns_edge_release_handle (self->edge_h);
+      self->edge_h = NULL;
+    }
+
+    return FALSE;
+  }
+
+  nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+  port = g_strdup_printf ("%d", self->dest_port);
+  nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+  g_free (port);
+
+  nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
+
+  if (0 != nns_edge_start (self->edge_h)) {
+    nns_loge
+        ("Failed to start NNStreamer-edge. Please check server IP and port");
+    return FALSE;
+  }
+
+  if (0 != nns_edge_connect (self->edge_h, self->dest_host, self->dest_port)) {
+    nns_loge ("Failed to connect to edge server!");
+    return FALSE;
+  }
+
+  return TRUE;
 }
 
 /**
- * @brief stop edgesrc, called when state changed ready to null
+ * @brief Create a buffer containing the subscribed data
  */
-static gboolean
-gst_edgesrc_stop (GstBaseSrc * basesrc)
+static GstFlowReturn
+gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
+    GstBuffer ** out_buf)
 {
   GstEdgeSrc *self = GST_EDGESRC (basesrc);
 
-  /** @todo stop */
+  nns_edge_data_h data_h;
+  GstBuffer *buffer = NULL;
+  guint i, num_data;
+  int ret;
+
+  UNUSED (offset);
+  UNUSED (size);
+
+  data_h = g_async_queue_pop (self->msg_queue);
+
+  if (!data_h) {
+    nns_loge ("Failed to get message from the edgesrc message queue.");
+    goto done;
+  }
+
+  ret = nns_edge_data_get_count (data_h, &num_data);
+  if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
+    nns_loge ("Failed to get the number of memories of the edge data.");
+    goto done;
+  }
+
+  buffer = gst_buffer_new ();
+  for (i = 0; i < num_data; i++) {
+    void *data = NULL;
+    size_t data_len = 0;
+    gpointer new_data;
+
+    nns_edge_data_get (data_h, i, &data, &data_len);
+    new_data = _g_memdup (data, data_len);
+
+    gst_buffer_append_memory (buffer,
+        gst_memory_new_wrapped (0, new_data, data_len, 0, data_len, new_data,
+            g_free));
+  }
+
+done:
+  if (data_h)
+    nns_edge_data_destroy (data_h);
+
+  if (buffer == NULL) {
+    nns_loge ("Failed to get buffer to push to the edgesrc.");
+    return GST_FLOW_ERROR;
+  }
+
+  *out_buf = buffer;
+
+  return GST_FLOW_OK;
 }
 
 /**
- * @brief Get caps of subclass
+ * @brief getter for the 'host' property.
  */
-static GstCaps *
-gst_edgesrc_get_caps (GstBaseSrc * basesrc, GstCaps * filter)
+static gchar *
+gst_edgesrc_get_dest_host (GstEdgeSrc * self)
 {
-  /** @todo get caps */
+  return self->dest_host;
 }
 
 /**
- * @brief Return the time information of the given buffer
+ * @brief setter for the 'host' property.
  */
 static void
-gst_edgesrc_get_times (GstBaseSrc * basesrc, GstBuffer * buffer,
-    GstClockTime * start, GstClockTime * end)
+gst_edgesrc_set_dest_host (GstEdgeSrc * self, const gchar * dest_host)
 {
-  /** @todo get times */
+  g_free (self->dest_host);
+  self->dest_host = g_strdup (dest_host);
 }
 
 /**
- * @brief Check if source supports seeking
+ * @brief getter for the 'port' property.
  */
-static gboolean
-gst_edgesrc_is_seekable (GstBaseSrc * basesrc)
+static guint16
+gst_edgesrc_get_dest_port (GstEdgeSrc * self)
 {
-  /** @todo is seekable */
+  return self->dest_port;
 }
 
 /**
- * @brief Create a buffer containing the subscribed data
+ * @brief setter for the 'port' property.
  */
-static GstFlowReturn
-gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset, guint size,
-    GstBuffer ** buf)
+static void
+gst_edgesrc_set_dest_port (GstEdgeSrc * self, const guint16 dest_port)
 {
-  GstEdgeSrc *self = GST_EDGESRC (basesrc);
+  self->dest_port = dest_port;
+}
 
-  /** @todo create */
+/**
+ * @brief getter for the 'connect_type' property.
+ */
+static nns_edge_connect_type_e
+gst_edgesrc_get_connect_type (GstEdgeSrc * self)
+{
+  return self->connect_type;
 }
 
 /**
- * @brief An implementation of the GstBaseSrc vmethod that handles queries
+ * @brief setter for the 'connect_type' property.
  */
-static gboolean
-gst_edgesrc_query (GstBaseSrc * basesrc, GstQuery * query)
+static void
+gst_edgesrc_set_connect_type (GstEdgeSrc * self,
+    const nns_edge_connect_type_e connect_type)
 {
-  /** @todo query */
+  self->connect_type = connect_type;
 }
index af66c6d..66465c7 100644 (file)
 #ifndef __GST_EDGE_SRC_H__
 #define __GST_EDGE_SRC_H__
 
+#include <gst/gst.h>
 #include <gst/base/gstbasesrc.h>
+#include "edge_common.h"
+#include "nnstreamer-edge.h"
+#include "nnstreamer_util.h"
+#include "../nnstreamer/nnstreamer_log.h"
 
 G_BEGIN_DECLS
 #define GST_TYPE_EDGESRC \
@@ -28,7 +33,7 @@ G_BEGIN_DECLS
     (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_EDGESRC))
 #define GST_EDGESRC_CAST(obj) ((GstEdgeSrc *) (obj))
 typedef struct _GstEdgeSrc GstEdgeSrc;
-typedef struct _GstEdgeSrcClass GstEdgeSinkClass;
+typedef struct _GstEdgeSrcClass GstEdgeSrcClass;
 
 /**
  * @brief GstEdgeSrc data structure.
@@ -36,7 +41,14 @@ typedef struct _GstEdgeSrcClass GstEdgeSinkClass;
 struct _GstEdgeSrc
 {
   GstBaseSrc element;
-}
+
+  gchar *dest_host;
+  guint16 dest_port;
+
+  nns_edge_connect_type_e connect_type;
+  nns_edge_h edge_h;
+  GAsyncQueue *msg_queue;
+};
 
 /**
  * @brief GstEdgeSrcClass data structure.
@@ -44,7 +56,7 @@ struct _GstEdgeSrc
 struct _GstEdgeSrcClass
 {
   GstBaseSrcClass parent_class;
-}
+};
 
 GType gst_edgesrc_get_type (void);
 
index 1ef797b..d6514b6 100644 (file)
@@ -7,11 +7,18 @@ edge_files = [
 
 edge_srcs = []
 edge_dep = [
-    glib_dep, 
+    glib_dep,
     gst_base_dep,
     gst_dep,
+    nnstreamer_edge_support_deps
 ]
 
+if build_platform == 'tizen'
+  edge_dep += dlog_dep
+elif cc.has_header_symbol('android/log.h', '__android_log_print')
+  edge_dep += cc.find_library('log')
+endif
+
 foreach s : edge_files
     edge_srcs += join_paths(meson.current_source_dir(), s)
 endforeach
index f500695..88452cc 100644 (file)
@@ -1,4 +1,6 @@
-subdir('edge')
+if nnstreamer_edge_support_is_available
+  subdir('edge')
+endif
 subdir('join')
 if mqtt_support_is_available
   subdir('mqtt')
index 9766500..2e98a68 100644 (file)
@@ -13,7 +13,7 @@ nnstreamer_single_deps = [
 # log utils
 # TODO: Define proper variable for android in entire project
 if build_platform == 'tizen'
-  nnstreamer_single_deps += dependency('dlog')
+  nnstreamer_single_deps += dlog_dep
 elif cc.has_header_symbol('android/log.h', '__android_log_print')
   nnstreamer_single_deps += cc.find_library('log')
 endif
index c3c64b8..8b4a4ab 100644 (file)
@@ -31,6 +31,7 @@ if get_option('enable-tizen')
 
   tizenVmajor = get_option('tizen-version-major')
   add_project_arguments('-DTIZENVERSION='+tizenVmajor.to_string(), language: ['c', 'cpp'])
+  dlog_dep = dependency('dlog')
 elif not meson.is_cross_build()
   if cc.get_id() == 'clang' and cxx.get_id() == 'clang'
     if build_machine.system() == 'darwin'
index 3a9de1f..cc0ab5e 100644 (file)
@@ -984,6 +984,7 @@ cp -r result %{buildroot}%{_datadir}/nnstreamer/unittest/
 %{_prefix}/lib/nnstreamer/decoders/libnnstreamer_decoder_octet_stream.so
 %{_prefix}/lib/nnstreamer/filters/libnnstreamer_filter_cpp.so
 %{gstlibdir}/libnnstreamer.so
+%{gstlibdir}/libgstedge.so
 %{_libdir}/libnnstreamer.so
 
 %files single
index e43369d..7da1ead 100644 (file)
@@ -44,6 +44,59 @@ function callCompareTestIfExist() {
     callCompareTest $1 $2 $3 "$4" $5 $6
 }
 
+## @brief Find the number of the first file that matches
+## @param $1 Raw file prefix
+## @param $2 Raw file suffix
+## @param $3 Result file name
+## @param $4 Test Case ID
+function findFirstMatchedFileNumber() {
+    num=-1
+    result_file="$3"
+
+    if [[ ! -f "$result_file" ]]; then
+        echo "$3 don't exist."
+        return
+    fi
+
+    command -v cmp
+    
+    if [[ $? == 0 && ! -L $(which cmp) ]]; then
+        cmp_exist=1
+    else
+        cmp_exist=0
+    fi
+
+    while :
+    do  
+        num=$((num+1))
+        raw_file="$1$num$2"
+
+        if [[ ! -f "$raw_file" ]]; then
+            echo "$raw_file don't exist."
+            num=-1
+            return
+        fi
+
+        if [[ "$cmp_exist" -eq "1" ]]; then
+            # use cmp - callCompareTest is too verbose
+            cmp $raw_file $result_file
+            output=$?
+            if [[ "$output" -eq "1" ]]; then
+                output=0
+            else
+                output=1
+            fi
+        else
+            # use `callCompareTest` if cmp is not exist
+            callCompareTest $raw_file $result_file "$4-$num" "find the number of the first file that matches for test $4" 0 1
+        fi
+
+        if [[ "$output" -eq "1" ]]; then
+            return
+        fi
+    done
+}
+
 # Run edge sink server as echo server with default address option. 
 PORT=`python3 ../../get_available_port.py`
 gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
@@ -51,17 +104,18 @@ gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
         t. ! queue ! multifilesink location=raw1_%1d.log \
         t. ! queue ! edgesink port=${PORT} async=false" 1-1 0 0 30
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result1_%1d.log" 1-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw1_0.log result1_0.log 1-3 "Compare 1-3" 1 0
-callCompareTestIfExist raw1_1.log result1_1.log 1-4 "Compare 1-4" 1 0
-callCompareTestIfExist raw1_2.log result1_2.log 1-5 "Compare 1-5" 1 0
-callCompareTestIfExist raw1_3.log result1_3.log 1-6 "Compare 1-6" 1 0
-callCompareTestIfExist raw1_4.log result1_4.log 1-7 "Compare 1-7" 1 0
-callCompareTestIfExist raw1_5.log result1_5.log 1-8 "Compare 1-8" 1 0
-callCompareTestIfExist raw1_6.log result1_6.log 1-9 "Compare 1-9" 1 0
-callCompareTestIfExist raw1_7.log result1_7.log 1-10 "Compare 1-10" 1 0
-callCompareTestIfExist raw1_8.log result1_8.log 1-11 "Compare 1-11" 1 0
-callCompareTestIfExist raw1_9.log result1_9.log 1-12 "Compare 1-12" 1 0
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result1_%1d.log" 1-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw1_" ".log" "result1_0.log" 1-3
+callCompareTestIfExist raw1_$((num+0)).log result1_0.log 1-3 "Compare 1-3" 1 0
+callCompareTestIfExist raw1_$((num+1)).log result1_1.log 1-4 "Compare 1-4" 1 0
+callCompareTestIfExist raw1_$((num+2)).log result1_2.log 1-5 "Compare 1-5" 1 0
+callCompareTestIfExist raw1_$((num+3)).log result1_3.log 1-6 "Compare 1-6" 1 0
+callCompareTestIfExist raw1_$((num+4)).log result1_4.log 1-7 "Compare 1-7" 1 0
+callCompareTestIfExist raw1_$((num+5)).log result1_5.log 1-8 "Compare 1-8" 1 0
+callCompareTestIfExist raw1_$((num+6)).log result1_6.log 1-9 "Compare 1-9" 1 0
+callCompareTestIfExist raw1_$((num+7)).log result1_7.log 1-10 "Compare 1-10" 1 0
+callCompareTestIfExist raw1_$((num+8)).log result1_8.log 1-11 "Compare 1-11" 1 0
+callCompareTestIfExist raw1_$((num+9)).log result1_9.log 1-12 "Compare 1-12" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
@@ -72,15 +126,17 @@ gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
         t. ! queue ! multifilesink location=raw2_%1d.log \
         t. ! queue ! edgesink port=${PORT} async=false" 2-1 0 0 30
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result2_0_%1d.log" 2-2 0 0 $PERFORMANCE
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result2_0_%1d.log" 2-2 0 0 $PERFORMANCE
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result2_1_%1d.log" 2-3 0 0 $PERFORMANCE
-callCompareTestIfExist raw2_0.log result2_0_0.log 2-4 "Compare 2-4" 1 0
-callCompareTestIfExist raw2_0.log result2_1_0.log 2-5 "Compare 2-5" 1 0
-callCompareTestIfExist raw2_1.log result2_0_1.log 2-6 "Compare 2-6" 1 0
-callCompareTestIfExist raw2_1.log result2_1_1.log 2-7 "Compare 2-7" 1 0
-callCompareTestIfExist raw2_2.log result2_0_2.log 2-8 "Compare 2-8" 1 0
-callCompareTestIfExist raw2_2.log result2_1_2.log 2-9 "Compare 2-9" 1 0
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result2_1_%1d.log" 2-3 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw2_" ".log" "result2_0_0.log" 2-4
+callCompareTestIfExist raw2_$((num+0)).log result2_0_0.log 2-4 "Compare 2-4" 1 0
+callCompareTestIfExist raw2_$((num+1)).log result2_0_1.log 2-5 "Compare 2-5" 1 0
+callCompareTestIfExist raw2_$((num+2)).log result2_0_2.log 2-6 "Compare 2-6" 1 0
+findFirstMatchedFileNumber "raw2_" ".log" "result2_1_0.log" 2-7
+callCompareTestIfExist raw2_$((num+0)).log result2_1_0.log 2-7 "Compare 2-7" 1 0
+callCompareTestIfExist raw2_$((num+1)).log result2_1_1.log 2-8 "Compare 2-8" 1 0
+callCompareTestIfExist raw2_$((num+2)).log result2_1_2.log 2-9 "Compare 2-9" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
@@ -91,10 +147,11 @@ gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
         t. ! queue ! multifilesink location=raw3_%1d.log \
         t. ! queue ! edgesink port=${PORT} async=false" 3-1 0 0 30
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result3_%1d.log" 3-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw3_0.log result3_0.log 3-3 "Compare 3-3" 1 0
-callCompareTestIfExist raw3_1.log result3_1.log 3-4 "Compare 3-4" 1 0
-callCompareTestIfExist raw3_2.log result3_2.log 3-5 "Compare 3-5" 1 0
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result3_%1d.log" 3-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw3_" ".log" "result3_0.log" 3-3
+callCompareTestIfExist raw3_$((num+0)).log result3_0.log 3-3 "Compare 3-3" 1 0
+callCompareTestIfExist raw3_$((num+1)).log result3_1.log 3-4 "Compare 3-4" 1 0
+callCompareTestIfExist raw3_$((num+2)).log result3_2.log 3-5 "Compare 3-5" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
@@ -103,12 +160,13 @@ PORT=`python3 ../../get_available_port.py`
 gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
     videotestsrc is-live=true ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tensor_converter ! other/tensors,num_tensors=1,dimensions=3:300:300:1,types=uint8,format=static ! tee name=t \
         t. ! queue ! multifilesink location=raw4_%1d.log \
-        t. ! queue ! edgesink port=${PORT} asnyc=false" 4-1 0 0 30
+        t. ! queue ! edgesink port=${PORT} async=false" 4-1 0 0 30
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result4_%1d.log" 4-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw4_0.log result4_0.log 4-3 "Compare 4-3" 1 0
-callCompareTestIfExist raw4_1.log result4_1.log 4-4 "Compare 4-4" 1 0
-callCompareTestIfExist raw4_2.log result4_2.log 4-5 "Compare 4-5" 1 0
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result4_%1d.log" 4-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw4_" ".log" "result4_0.log" 4-3
+callCompareTestIfExist raw4_$((num+0)).log result4_0.log 4-3 "Compare 4-3" 1 0
+callCompareTestIfExist raw4_$((num+1)).log result4_1.log 4-4 "Compare 4-4" 1 0
+callCompareTestIfExist raw4_$((num+2)).log result4_2.log 4-5 "Compare 4-5" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
@@ -119,29 +177,13 @@ gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
         t. ! queue ! multifilesink location=raw5_%1d.log \
         t. ! queue ! edgesink port=${PORT} async=false" 5-1 0 0 30
 gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result5_%1d.log" 5-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw5_0.log result5_0.log 5-3 "Compare 5-3" 1 0
-callCompareTestIfExist raw5_1.log result5_1.log 5-4 "Compare 5-4" 1 0
-callCompareTestIfExist raw5_2.log result5_2.log 5-5 "Compare 5-5" 1 0
-kill -9 $pid &> /dev/null
-wait $pid
-
-# Cap: Text
-PORT=`python3 ../../get_available_port.py`
-RANDOM_TEXT="test.txt"
-python3 generate_random_text.py "--file_name ${RANDOM_TEXT}"
-gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    filesrc location=${RANDOM_TEXT} ! text/x-raw,format=utf8 ! tee name=t \
-        t. ! queue ! multifilesink location=raw6_%1d.log \
-        t. ! queue ! edgesink port=${PORT} async=false" 6-1 0 0 30
-gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
-    edgesrc port=${PORT} num-buffers=10 ! multifilesink location=result6_%1d.log" 6-2 0 0 $PERFORMANCE
-callCompareTestIfExist raw6_0.log result6_0.log 6-3 "Compare 6-3" 1 0
-callCompareTestIfExist raw6_1.log result6_1.log 6-4 "Compare 6-4" 1 0
-callCompareTestIfExist raw6_2.log result6_2.log 6-5 "Compare 6-5" 1 0
+    edgesrc dest-port=${PORT} num-buffers=10 ! multifilesink location=result5_%1d.log" 5-2 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw5_" ".log" "result5_0.log" 5-3
+callCompareTestIfExist raw5_$((num+0)).log result5_0.log 5-3 "Compare 5-3" 1 0
+callCompareTestIfExist raw5_$((num+1)).log result5_1.log 5-4 "Compare 5-4" 1 0
+callCompareTestIfExist raw5_$((num+2)).log result5_2.log 5-5 "Compare 5-5" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
-rm ${RANDOM_TEXT}
 
 rm *.log
 report