[EDGE] Support AITT as connection type.
authorgichan <gichan2.jang@samsung.com>
Wed, 14 Sep 2022 04:34:17 +0000 (13:34 +0900)
committerSangjung Woo <again4you@gmail.com>
Mon, 26 Sep 2022 05:59:55 +0000 (14:59 +0900)
Edgesrc/sink support AITT as connection type.
Add unit test cases.

Signed-off-by: gichan <gichan2.jang@samsung.com>
13 files changed:
gst/edge/edge_common.c
gst/edge/edge_sink.c
gst/edge/edge_sink.h
gst/edge/edge_src.c
gst/edge/edge_src.h
meson.build
meson_options.txt
packaging/nnstreamer.spec
tests/nnstreamer_edge/edge/runTest.sh
tests/nnstreamer_edge/edge/unittest_edge.cc
tests/nnstreamer_edge/query/unittest_query.cc
tests/unittest_util.c
tests/unittest_util.h

index 63115d9..dd3a1d1 100644 (file)
@@ -27,7 +27,8 @@ gst_edge_get_connect_type (void)
     static GEnumValue protocols[] = {
       {NNS_EDGE_CONNECT_TYPE_TCP, "TCP",
           "Directly sending stream frames via TCP connections."},
-          /** @todo support UDP, MQTT and HYBRID */
+      {NNS_EDGE_CONNECT_TYPE_AITT, "AITT",
+          "Sending stream frames via AITT connections."},
       {0, NULL, NULL},
     };
     protocol = g_enum_register_static ("edge_protocol", protocols);
index c94968f..68f0b8c 100644 (file)
@@ -36,10 +36,15 @@ enum
 
   PROP_HOST,
   PROP_PORT,
+  PROP_DEST_HOST,
+  PROP_DEST_PORT,
   PROP_CONNECT_TYPE,
+  PROP_TOPIC,
 
   PROP_LAST
 };
+#define DEFAULT_MQTT_HOST "tcp://localhost"
+#define DEFAULT_MQTT_PORT 1883
 
 #define gst_edgesink_parent_class parent_class
 G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
@@ -99,6 +104,20 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
           "The connections type between edgesink and edgesrc.",
           GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_DEST_HOST,
+      g_param_spec_string ("dest-host", "Destination Host",
+          "The destination hostname of the broker", DEFAULT_MQTT_HOST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_DEST_PORT,
+      g_param_spec_uint ("dest-port", "Destination Port",
+          "The destination port of the broker", 0,
+          65535, DEFAULT_MQTT_PORT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_TOPIC,
+      g_param_spec_string ("topic", "Topic",
+          "The main topic of the host and option if necessary. "
+          "(topic)/(optional topic for main topic).", "",
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
@@ -123,6 +142,9 @@ gst_edgesink_init (GstEdgeSink * self)
 {
   self->host = g_strdup (DEFAULT_HOST);
   self->port = DEFAULT_PORT;
+  self->dest_host = g_strdup (DEFAULT_HOST);
+  self->dest_port = DEFAULT_PORT;
+  self->topic = NULL;
   self->connect_type = DEFAULT_CONNECT_TYPE;
 }
 
@@ -142,10 +164,28 @@ gst_edgesink_set_property (GObject * object, guint prop_id,
     case PROP_PORT:
       gst_edgesink_set_port (self, g_value_get_uint (value));
       break;
+    case PROP_DEST_HOST:
+      if (!g_value_get_string (value)) {
+        nns_logw ("dest host property cannot be NULL");
+        break;
+      }
+      g_free (self->dest_host);
+      self->dest_host = g_value_dup_string (value);
+      break;
+    case PROP_DEST_PORT:
+      self->dest_port = g_value_get_uint (value);
+      break;
     case PROP_CONNECT_TYPE:
       gst_edgesink_set_connect_type (self, g_value_get_enum (value));
       break;
-
+    case PROP_TOPIC:
+      if (!g_value_get_string (value)) {
+        nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
+        break;
+      }
+      g_free (self->topic);
+      self->topic = g_value_dup_string (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -168,10 +208,18 @@ gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_PORT:
       g_value_set_uint (value, gst_edgesink_get_port (self));
       break;
+    case PROP_DEST_HOST:
+      g_value_set_string (value, self->dest_host);
+      break;
+    case PROP_DEST_PORT:
+      g_value_set_uint (value, self->dest_port);
+      break;
     case PROP_CONNECT_TYPE:
       g_value_set_enum (value, gst_edgesink_get_connect_type (self));
       break;
-
+    case PROP_TOPIC:
+      g_value_set_string (value, self->topic);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -210,7 +258,7 @@ gst_edgesink_start (GstBaseSink * basesink)
   char *port = NULL;
 
   ret =
-      nns_edge_create_handle ("TEMP_ID", self->connect_type,
+      nns_edge_create_handle ("TEMP_ID_EDGE_SINK", self->connect_type,
       NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
 
   if (NNS_EDGE_ERROR_NONE != ret) {
@@ -224,10 +272,22 @@ gst_edgesink_start (GstBaseSink * basesink)
     return FALSE;
   }
 
-  nns_edge_set_info (self->edge_h, "HOST", self->host);
-  port = g_strdup_printf ("%d", self->port);
-  nns_edge_set_info (self->edge_h, "PORT", port);
-  g_free (port);
+  if (self->host)
+    nns_edge_set_info (self->edge_h, "HOST", self->host);
+  if (self->port > 0) {
+    port = g_strdup_printf ("%u", self->port);
+    nns_edge_set_info (self->edge_h, "PORT", port);
+    g_free (port);
+  }
+  if (self->dest_host)
+    nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+  if (self->dest_port > 0) {
+    port = g_strdup_printf ("%u", self->dest_port);
+    nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+    g_free (port);
+  }
+  if (self->topic)
+    nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
 
   if (0 != nns_edge_start (self->edge_h)) {
     nns_loge
index 110ae70..b326a9d 100644 (file)
@@ -44,6 +44,9 @@ struct _GstEdgeSink
 
   gchar *host;
   guint16 port;
+  gchar *dest_host;
+  guint16 dest_port;
+  gchar *topic;
 
   nns_edge_connect_type_e connect_type;
   nns_edge_h edge_h;
index 02fa68c..687c87f 100644 (file)
@@ -31,10 +31,12 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
 enum
 {
   PROP_0,
-
+  PROP_HOST,
+  PROP_PORT,
   PROP_DEST_HOST,
   PROP_DEST_PORT,
   PROP_CONNECT_TYPE,
+  PROP_TOPIC,
 
   PROP_LAST
 };
@@ -78,6 +80,14 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)
   gobject_class->get_property = gst_edgesrc_get_property;
   gobject_class->finalize = gst_edgesrc_class_finalize;
 
+  g_object_class_install_property (gobject_class, PROP_HOST,
+      g_param_spec_string ("host", "Host",
+          "A self host address", DEFAULT_HOST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_PORT,
+      g_param_spec_uint ("port", "Port",
+          "A self port number.",
+          0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, PROP_DEST_HOST,
       g_param_spec_string ("dest-host", "Destination Host",
           "A host address of edgesink to receive the packets from edgesink",
@@ -91,6 +101,11 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)
           "The connections type between edgesink and edgesrc.",
           GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_TOPIC,
+      g_param_spec_string ("topic", "Topic",
+          "The main topic of the host and option if necessary. "
+          "(topic)/(optional topic for main topic).", "",
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&srctemplate));
@@ -117,8 +132,11 @@ gst_edgesrc_init (GstEdgeSrc * self)
   gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
   gst_base_src_set_async (basesrc, FALSE);
 
+  self->host = g_strdup (DEFAULT_HOST);
+  self->port = DEFAULT_PORT;
   self->dest_host = g_strdup (DEFAULT_HOST);
   self->dest_port = DEFAULT_PORT;
+  self->topic = NULL;
   self->msg_queue = g_async_queue_new ();
   self->connect_type = DEFAULT_CONNECT_TYPE;
 }
@@ -133,6 +151,17 @@ gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
   GstEdgeSrc *self = GST_EDGESRC (object);
 
   switch (prop_id) {
+    case PROP_HOST:
+      if (!g_value_get_string (value)) {
+        nns_logw ("host property cannot be NULL");
+        break;
+      }
+      g_free (self->host);
+      self->host = g_value_dup_string (value);
+      break;
+    case PROP_PORT:
+      self->port = g_value_get_uint (value);
+      break;
     case PROP_DEST_HOST:
       gst_edgesrc_set_dest_host (self, g_value_get_string (value));
       break;
@@ -142,7 +171,14 @@ gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
     case PROP_CONNECT_TYPE:
       gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
       break;
-
+    case PROP_TOPIC:
+      if (!g_value_get_string (value)) {
+        nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
+        break;
+      }
+      g_free (self->topic);
+      self->topic = g_value_dup_string (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -159,6 +195,12 @@ gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
   GstEdgeSrc *self = GST_EDGESRC (object);
 
   switch (prop_id) {
+    case PROP_HOST:
+      g_value_set_string (value, self->host);
+      break;
+    case PROP_PORT:
+      g_value_set_uint (value, self->port);
+      break;
     case PROP_DEST_HOST:
       g_value_set_string (value, gst_edgesrc_get_dest_host (self));
       break;
@@ -168,7 +210,9 @@ gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_CONNECT_TYPE:
       g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
       break;
-
+    case PROP_TOPIC:
+      g_value_set_string (value, self->topic);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -247,7 +291,7 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
   char *port = NULL;
 
   ret =
-      nns_edge_create_handle ("TEMP_ID", self->connect_type,
+      nns_edge_create_handle ("TEMP_ID_EDGE_SRC", self->connect_type,
       NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
 
   if (NNS_EDGE_ERROR_NONE != ret) {
@@ -261,10 +305,22 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
     return FALSE;
   }
 
-  nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
-  port = g_strdup_printf ("%d", self->dest_port);
-  nns_edge_set_info (self->edge_h, "DEST_PORT", port);
-  g_free (port);
+  if (self->host)
+    nns_edge_set_info (self->edge_h, "HOST", self->host);
+  if (self->port > 0) {
+    port = g_strdup_printf ("%u", self->port);
+    nns_edge_set_info (self->edge_h, "PORT", port);
+    g_free (port);
+  }
+  if (self->dest_host)
+    nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+  if (self->dest_port > 0) {
+    port = g_strdup_printf ("%u", self->dest_port);
+    nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+    g_free (port);
+  }
+  if (self->topic)
+    nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
 
   nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
 
index 66465c7..e11d855 100644 (file)
@@ -42,8 +42,11 @@ struct _GstEdgeSrc
 {
   GstBaseSrc element;
 
+  gchar *host;
+  guint16 port;
   gchar *dest_host;
   guint16 dest_port;
+  gchar *topic;
 
   nns_edge_connect_type_e connect_type;
   nns_edge_h edge_h;
index 8b4a4ab..34933b2 100644 (file)
@@ -425,6 +425,10 @@ features = {
   'mxnet-support': {
     'extra_deps': [ mxnet_dep ],
     'project_args': { 'ENABLE_MXNET' : 1 }
+  },
+  'aitt-support': {
+    'target': 'aitt',
+    'project_args': { 'ENABLE_AITT' : 1 }
   }
 }
 
index b114ed2..c728c5a 100644 (file)
@@ -25,6 +25,7 @@ option('tvm-support', type: 'feature', value: 'auto')
 option('trix-engine-support', type: 'feature', value: 'auto')
 option('nnstreamer-edge-support', type: 'feature', value: 'auto')
 option('mxnet-support', type: 'feature', value: 'auto')
+option('aitt-support', type: 'feature', value: 'auto')
 
 # booleans & other options
 option('enable-test', type: 'boolean', value: true)
index f6042f4..270edec 100644 (file)
@@ -35,6 +35,7 @@
 %define                tvm_support 1
 %define                snpe_support 1
 %define                trix_engine_support 1
+%define                aitt_support 1
 # Support AI offloading (tensor_query) using nnstreamer-edge interface
 %define                nnstreamer_edge_support 1
 
 %define                snpe_support 0
 %define                trix_engine_support 0
 %define                nnstreamer_edge_support 0
+%define                aitt_support 0
 %endif
 
 # DA requested to remove unnecessary module builds
 %define                mqtt_support 0
 %define                tvm_support 0
 %define                trix_engine_support 0
+%define                aitt_support 0
 %endif
 
 # Release unit test suite as a subpackage only if check_test is enabled.
@@ -229,6 +232,9 @@ BuildConflicts: libarmcl-release
 %if 0%{?edgetpu_support}
 BuildRequires: pkgconfig(edgetpu)
 %endif
+%if 0%{?aitt_support}
+BuildRequires:  aitt-devel
+%endif
 
 %if 0%{?testcoverage}
 # to be compatible with gcc-9, lcov should have a higher version than 1.14.1
index 7da1ead..650ead3 100644 (file)
@@ -185,6 +185,34 @@ callCompareTestIfExist raw5_$((num+2)).log result5_2.log 5-5 "Compare 5-5" 1 0
 kill -9 $pid &> /dev/null
 wait $pid
 
+# Check AITT lib is exist or not.
+/sbin/ldconfig -p | grep libaitt.so >/dev/null 2>&1
+if [[ "$?" != 0 ]]; then
+    echo "AITT lib is not installed. Skip AITT test."
+    rm *.log
+    report
+    exit
+fi
+
+gstTestBackground "--gst-plugin-path=${PATH_TO_PLUGIN} \
+    videotestsrc is-live=true ! videoconvert ! videoscale ! video/x-raw,width=300,height=300,format=RGB ! tee name=t \
+        t. ! queue ! multifilesink location=raw6_%1d.log \
+        t. ! queue ! edgesink port=0 connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTopic async=false" 6-1 0 0 30
+gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
+    edgesrc port=0 connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTopic num-buffers=10 ! multifilesink location=result6_0_%1d.log" 6-2 0 0 $PERFORMANCE
+gstTest "--gst-plugin-path=${PATH_TO_PLUGIN} \
+    edgesrc port=0 connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTopic num-buffers=10 ! multifilesink location=result6_1_%1d.log" 6-3 0 0 $PERFORMANCE
+findFirstMatchedFileNumber "raw6_" ".log" "result6_0_0.log" 6-4
+callCompareTestIfExist raw6_$((num+0)).log result6_0_0.log 2-4 "Compare 6-4" 1 0
+callCompareTestIfExist raw6_$((num+1)).log result6_0_1.log 2-5 "Compare 6-5" 1 0
+callCompareTestIfExist raw6_$((num+2)).log result6_0_2.log 2-6 "Compare 6-6" 1 0
+findFirstMatchedFileNumber "raw6_" ".log" "result6_1_0.log" 6-7
+callCompareTestIfExist raw6_$((num+0)).log result6_1_0.log 2-7 "Compare 6-7" 1 0
+callCompareTestIfExist raw6_$((num+1)).log result6_1_1.log 2-8 "Compare 6-8" 1 0
+callCompareTestIfExist raw6_$((num+2)).log result6_1_2.log 2-9 "Compare 6-9" 1 0
+kill -9 $pid &> /dev/null
+wait $pid
+
 rm *.log
 report
 exit
index 70bc87f..3852685 100644 (file)
 #include <gtest/gtest.h>
 #include <glib.h>
 #include <gst/gst.h>
+#include "unittest_util.h"
+#include <gst/app/gstappsrc.h>
+#include "nnstreamer_log.h"
+
+static int data_received;
+
+/**
+ * @brief Test for edgesink get and set properties.
+ */
+TEST (edgeSink, properties0)
+{
+  gchar *pipeline;
+  GstElement *gstpipe;
+  GstElement *edge_handle;
+  gint int_val;
+  guint uint_val;
+  gchar *str_val;
+
+  /* Create a nnstreamer pipeline */
+  pipeline = g_strdup_printf (
+      "gst-launch-1.0 videotestsrc ! videoconvert ! videoscale ! "
+      "video/x-raw,width=320,height=240,format=RGB,framerate=10/1 ! "
+      "tensor_converter ! edgesink name=sinkx port=0");
+  gstpipe = gst_parse_launch (pipeline, NULL);
+  EXPECT_NE (gstpipe, nullptr);
+
+  edge_handle = gst_bin_get_by_name (GST_BIN (gstpipe), "sinkx");
+  EXPECT_NE (edge_handle, nullptr);
+
+  /* Set/Get properties of edgesink */
+  g_object_set (edge_handle, "host", "127.0.0.2", NULL);
+  g_object_get (edge_handle, "host", &str_val, NULL);
+  EXPECT_STREQ ("127.0.0.2", str_val);
+  g_free (str_val);
+
+  g_object_set (edge_handle, "port", 5001U, NULL);
+  g_object_get (edge_handle, "port", &uint_val, NULL);
+  EXPECT_EQ (5001U, uint_val);
+
+  g_object_set (edge_handle, "dest-host", "127.0.0.2", NULL);
+  g_object_get (edge_handle, "dest-host", &str_val, NULL);
+  EXPECT_STREQ ("127.0.0.2", str_val);
+  g_free (str_val);
+
+  g_object_set (edge_handle, "dest-port", 5001U, NULL);
+  g_object_get (edge_handle, "dest-port", &uint_val, NULL);
+  EXPECT_EQ (5001U, uint_val);
+
+  g_object_set (edge_handle, "connect-type", 0, NULL);
+  g_object_get (edge_handle, "connect-type", &int_val, NULL);
+  EXPECT_EQ (0, int_val);
+
+  g_object_set (edge_handle, "topic", "TEMP_TEST_TOPIC", NULL);
+  g_object_get (edge_handle, "topic", &str_val, NULL);
+  EXPECT_STREQ ("TEMP_TEST_TOPIC", str_val);
+  g_free (str_val);
+
+  gst_object_unref (edge_handle);
+  gst_object_unref (gstpipe);
+  g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesink with invalid host name.
+ */
+TEST (edgeSink, properties2_n)
+{
+  gchar *pipeline;
+  GstElement *gstpipe;
+
+  /* Create a nnstreamer pipeline */
+  pipeline = g_strdup_printf (
+      "gst-launch-1.0 videotestsrc ! videoconvert ! videoscale ! "
+      "video/x-raw,width=320,height=240,format=RGB,framerate=10/1 ! "
+      "tensor_converter ! edgesink host=f.a.i.l name=sinkx port=0");
+  gstpipe = gst_parse_launch (pipeline, NULL);
+  EXPECT_NE (gstpipe, nullptr);
+
+  EXPECT_NE (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+
+  gst_object_unref (gstpipe);
+  g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesrc get and set properties.
+ */
+TEST (edgeSrc, properties0)
+{
+  gchar *pipeline;
+  GstElement *gstpipe;
+  GstElement *edge_handle;
+  gint int_val;
+  guint uint_val;
+  gchar *str_val;
+
+  /* Create a nnstreamer pipeline */
+  pipeline = g_strdup_printf (
+      "gst-launch-1.0 edgesrc port=0 name=srcx ! "
+      "other/tensors,num_tensors=1,dimensions=3:320:240:1,types=uint8,format=static,framerate=30/1 ! "
+      "tensor_sink");
+  gstpipe = gst_parse_launch (pipeline, NULL);
+  EXPECT_NE (gstpipe, nullptr);
+
+  edge_handle = gst_bin_get_by_name (GST_BIN (gstpipe), "srcx");
+  EXPECT_NE (edge_handle, nullptr);
+
+  /* Set/Get properties of edgesrc */
+  g_object_set (edge_handle, "host", "127.0.0.2", NULL);
+  g_object_get (edge_handle, "host", &str_val, NULL);
+  EXPECT_STREQ ("127.0.0.2", str_val);
+  g_free (str_val);
+
+  g_object_set (edge_handle, "port", 5001U, NULL);
+  g_object_get (edge_handle, "port", &uint_val, NULL);
+  EXPECT_EQ (5001U, uint_val);
+
+  g_object_set (edge_handle, "dest-host", "127.0.0.2", NULL);
+  g_object_get (edge_handle, "dest-host", &str_val, NULL);
+  EXPECT_STREQ ("127.0.0.2", str_val);
+  g_free (str_val);
+
+  g_object_set (edge_handle, "dest-port", 5001U, NULL);
+  g_object_get (edge_handle, "dest-port", &uint_val, NULL);
+  EXPECT_EQ (5001U, uint_val);
+
+  g_object_set (edge_handle, "connect-type", 0, NULL);
+  g_object_get (edge_handle, "connect-type", &int_val, NULL);
+  EXPECT_EQ (0, int_val);
+
+  g_object_set (edge_handle, "topic", "TEMP_TEST_TOPIC", NULL);
+  g_object_get (edge_handle, "topic", &str_val, NULL);
+  EXPECT_STREQ ("TEMP_TEST_TOPIC", str_val);
+  g_free (str_val);
+
+  gst_object_unref (edge_handle);
+  gst_object_unref (gstpipe);
+  g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesrc with invalid host name.
+ */
+TEST (edgeSrc, properties2_n)
+{
+  gchar *pipeline;
+  GstElement *gstpipe;
+
+  /* Create a nnstreamer pipeline */
+  pipeline = g_strdup_printf (
+      "gst-launch-1.0 edgesrc host=f.a.i.l port=0 name=srcx ! "
+      "other/tensors,num_tensors=1,dimensions=3:320:240:1,types=uint8,format=static,framerate=30/1 ! "
+      "tensor_sink");
+  gstpipe = gst_parse_launch (pipeline, NULL);
+  EXPECT_NE (gstpipe, nullptr);
+
+  EXPECT_NE (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+
+  gst_object_unref (gstpipe);
+  g_free (pipeline);
+}
+
+/**
+ * @brief Test data for edgesink/src (dimension 3:4:2)
+ */
+const gint test_frames[48] =
+     {1101, 1102, 1103, 1104, 1105, 1106, 1107, 1108, 1109, 1110, 1111, 1112,
+      1113, 1114, 1115, 1116, 1117, 1118, 1119, 1120, 1121, 1122, 1123, 1124,
+      1201, 1202, 1203, 1204, 1205, 1206, 1207, 1208, 1209, 1210, 1211, 1212,
+      1213, 1214, 1215, 1216, 1217, 1218, 1219, 1220, 1221, 1222, 1223, 1224 };
+
+/**
+ * @brief Callback for tensor sink signal.
+ */
+static void
+new_data_cb (GstElement *element, GstBuffer *buffer, gpointer user_data)
+{
+  GstMemory *mem_res;
+  GstMapInfo info_res;
+  gint *output, i;
+  gboolean ret;
+
+  g_critical ("[DEBUG] NEW DATA RECEIVED!");
+  data_received++;
+  mem_res = gst_buffer_get_memory (buffer, 0);
+  ret = gst_memory_map (mem_res, &info_res, GST_MAP_READ);
+  ASSERT_TRUE (ret);
+  output = (gint *)info_res.data;
+
+  for (i = 0; i < 48; i++) {
+    EXPECT_EQ (test_frames[i], output[i]);
+  }
+  gst_memory_unmap (mem_res, &info_res);
+  gst_memory_unref (mem_res);
+}
 
-/** @todo Add edgesrc and edgesink unit test */
 /**
- * @brief Test for edgesrc get and set properties
+ * @brief Test for edgesink and edgesrc.
  */
-TEST (Edge, sourceProperties0)
+TEST (edgeSinkSrc, runNormal)
 {
+  gchar *sink_pipeline, *src_pipeline;
+  GstElement *sink_gstpipe, *src_gstpipe;
+  GstElement *appsrc_handle, *sink_handle, *edge_handle;
+  guint port;
+  GstBuffer *buf;
+  GstMemory *mem;
+  GstMapInfo info;
+  int ret;
+
+  /* Create a nnstreamer pipeline */
+  port= get_available_port ();
+  sink_pipeline = g_strdup_printf (
+      "appsrc name=appsrc ! other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! edgesink name=sinkx port=%u async=false", port);
+  sink_gstpipe = gst_parse_launch (sink_pipeline, NULL);
+  EXPECT_NE (sink_gstpipe, nullptr);
+
+  edge_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "sinkx");
+  EXPECT_NE (edge_handle, nullptr);
+  g_object_get (edge_handle, "port", &port, NULL);
+
+  appsrc_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "appsrc");
+  EXPECT_NE (appsrc_handle, nullptr);
+
+  src_pipeline = g_strdup_printf (
+      "gst-launch-1.0 edgesrc dest-port=%u name=srcx ! "
+      "other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! "
+      "tensor_sink name=sinkx async=false", port);
+  src_gstpipe = gst_parse_launch (src_pipeline, NULL);
+  EXPECT_NE (src_gstpipe, nullptr);
+
+  sink_handle = gst_bin_get_by_name (GST_BIN (src_gstpipe), "sinkx");
+  EXPECT_NE (sink_handle, nullptr);
+
+  g_signal_connect (sink_handle, "new-data", (GCallback)new_data_cb, NULL);
+
+  buf = gst_buffer_new ();
+  mem = gst_allocator_alloc (NULL, 192, NULL);
+  ret = gst_memory_map (mem, &info, GST_MAP_WRITE);
+  ASSERT_TRUE (ret);
+  memcpy (info.data, test_frames, 192);
+  gst_memory_unmap (mem, &info);
+  gst_buffer_append_memory (buf, mem);
+  data_received = 0;
+
+  EXPECT_EQ (setPipelineStateSync (sink_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+  g_usleep (1000000);
+
+  buf = gst_buffer_ref (buf);
+  EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+  g_usleep (100000);
+
+  EXPECT_EQ (setPipelineStateSync (src_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+  g_usleep (100000);
+
+  EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+  g_usleep (100000);
+
+  gst_object_unref (src_gstpipe);
+  g_free (src_pipeline);
+
+  gst_object_unref (appsrc_handle);
+  gst_object_unref (edge_handle);
+  gst_object_unref (sink_handle);
+  gst_object_unref (sink_gstpipe);
+  g_free (sink_pipeline);
 }
 
+#ifdef ENABLE_AITT
+/**
+ * @brief Check whether MQTT broker is running or not.
+ */
+static bool
+_check_mqtt_broker ()
+{
+  int ret = 0;
+
+  ret = system ("ps aux | grep mosquitto | grep -v grep");
+  if (0 != ret) {
+    nns_logw ("MQTT broker is not running. Skip query hybrid test.");
+    return false;
+  }
+
+  return true;
+}
+
+/**
+ * @brief Test for edgesink and edgesrc using AITT.
+ */
+TEST (edgeSinkSrc, runNormalAitt)
+{
+  gchar *sink_pipeline, *src_pipeline;
+  GstElement *sink_gstpipe, *src_gstpipe;
+  GstElement *appsrc_handle, *sink_handle;
+  GstBuffer *buf;
+  GstMemory *mem;
+  GstMapInfo info;
+  int ret;
+
+  if (!_check_mqtt_broker ())
+    return;
+
+  /* Create a nnstreamer pipeline */
+  sink_pipeline = g_strdup_printf (
+      "appsrc name=appsrc ! other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! edgesink name=sinkx port=0 connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTestTopic async=false");
+  sink_gstpipe = gst_parse_launch (sink_pipeline, NULL);
+  EXPECT_NE (sink_gstpipe, nullptr);
+
+  appsrc_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "appsrc");
+  EXPECT_NE (appsrc_handle, nullptr);
+
+  src_pipeline = g_strdup_printf (
+      "gst-launch-1.0 edgesrc connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTestTopic name=srcx ! "
+      "other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! "
+      "tensor_sink name=sinkx async=false");
+  src_gstpipe = gst_parse_launch (src_pipeline, NULL);
+  EXPECT_NE (src_gstpipe, nullptr);
+
+  sink_handle = gst_bin_get_by_name (GST_BIN (src_gstpipe), "sinkx");
+  EXPECT_NE (sink_handle, nullptr);
+
+  g_signal_connect (sink_handle, "new-data", (GCallback)new_data_cb, NULL);
+
+  buf = gst_buffer_new ();
+  mem = gst_allocator_alloc (NULL, 192, NULL);
+  ret = gst_memory_map (mem, &info, GST_MAP_WRITE);
+  ASSERT_TRUE (ret);
+  memcpy (info.data, test_frames, 192);
+  gst_memory_unmap (mem, &info);
+  gst_buffer_append_memory (buf, mem);
+  data_received = 0;
+
+  EXPECT_EQ (setPipelineStateSync (sink_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+  g_usleep (1000000);
+
+  buf = gst_buffer_ref (buf);
+  EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+  g_usleep (100000);
+
+  EXPECT_EQ (setPipelineStateSync (src_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+  g_usleep (100000);
+
+  EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+  g_usleep (100000);
+
+  gst_object_unref (src_gstpipe);
+  g_free (src_pipeline);
+
+  gst_object_unref (appsrc_handle);
+  gst_object_unref (sink_handle);
+  gst_object_unref (sink_gstpipe);
+  g_free (sink_pipeline);
+}
+#endif
 /**
  * @brief Main GTest
  */
index b3e2e9b..b692250 100644 (file)
 #include <tensor_common.h>
 #include <unittest_util.h>
 #include "../gst/nnstreamer/tensor_query/tensor_query_common.h"
-#include <gio/gio.h>
-#include <netinet/tcp.h>
-#include <netinet/in.h>
-/**
- * @brief Get available port number.
- */
-static guint
-_get_available_port (void)
-{
-  struct sockaddr_in sin;
-  guint port = 0;
-  gint sock;
-  socklen_t len = sizeof (struct sockaddr);
-
-  sin.sin_family = AF_INET;
-  sin.sin_addr.s_addr = INADDR_ANY;
-  sin.sin_port = htons(0);
-
-  sock = socket (AF_INET, SOCK_STREAM, 0);
-  EXPECT_TRUE (sock > 0);
-  if (sock < 0)
-    return 0;
-
-  if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
-    if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) {
-      port = ntohs (sin.sin_port);
-    }
-  }
-  close (sock);
-
-  EXPECT_TRUE (port > 0);
-  return port;
-}
 
 /**
  * @brief Test for tensor_query_server get and set properties
@@ -61,7 +28,7 @@ TEST (tensorQuery, serverProperties0)
   gchar *str_val;
   guint src_port;
 
-  src_port = _get_available_port ();
+  src_port = get_available_port ();
 
   /* Create a nnstreamer pipeline */
   pipeline = g_strdup_printf (
@@ -167,7 +134,7 @@ TEST (tensorQuery, serverProperties2_n)
   GstElement *gstpipe;
   guint src_port;
 
-  src_port = _get_available_port ();
+  src_port = get_available_port ();
 
   /* Create a nnstreamer pipeline */
   pipeline = g_strdup_printf (
@@ -252,7 +219,7 @@ TEST (tensorQuery, serverRun)
   GstElement *gstpipe;
   guint src_port;
 
-  src_port = _get_available_port ();
+  src_port = get_available_port ();
 
   /* Create a nnstreamer pipeline */
   pipeline = g_strdup_printf (
index 88005bf..689305e 100644 (file)
@@ -170,6 +170,35 @@ replace_string (gchar * source, const gchar * what, const gchar * to,
   return result;
 }
 
+/**
+ * @brief Get available port number.
+ */
+guint
+get_available_port (void)
+{
+  struct sockaddr_in sin;
+  guint port = 0;
+  gint sock;
+  socklen_t len = sizeof (struct sockaddr);
+
+  sin.sin_family = AF_INET;
+  sin.sin_addr.s_addr = INADDR_ANY;
+  sin.sin_port = htons(0);
+
+  sock = socket (AF_INET, SOCK_STREAM, 0);
+  if (sock < 0)
+    return 0;
+
+  if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) {
+    if (getsockname (sock, (struct sockaddr *) &sin, &len) == 0) {
+      port = ntohs (sin.sin_port);
+    }
+  }
+  close (sock);
+
+  return port;
+}
+
 #ifdef FAKEDLOG
 /**
  * @brief Hijack dlog Tizen infra for unit testing to force printing out.
index 440d0ac..de798b6 100644 (file)
@@ -15,6 +15,9 @@
 #include <stdint.h>
 #include <errno.h>
 #include <glib/gstdio.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <gio/gio.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -61,6 +64,12 @@ extern gchar *
 replace_string (gchar * source, const gchar * what, const gchar * to, const gchar * delimiters, guint * count);
 
 /**
+ * @brief Get available port number.
+ */
+extern guint
+get_available_port (void);
+
+/**
  * @brief Wait until the pipeline saving the file
  * @return TRUE on success, FALSE when a time-out occurs
  */