PROP_HOST,
PROP_PORT,
+ PROP_DEST_HOST,
+ PROP_DEST_PORT,
PROP_CONNECT_TYPE,
+ PROP_TOPIC,
PROP_LAST
};
+#define DEFAULT_MQTT_HOST "tcp://localhost"
+#define DEFAULT_MQTT_PORT 1883
#define gst_edgesink_parent_class parent_class
G_DEFINE_TYPE (GstEdgeSink, gst_edgesink, GST_TYPE_BASE_SINK);
"The connections type between edgesink and edgesrc.",
GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_DEST_HOST,
+ g_param_spec_string ("dest-host", "Destination Host",
+ "The destination hostname of the broker", DEFAULT_MQTT_HOST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_DEST_PORT,
+ g_param_spec_uint ("dest-port", "Destination Port",
+ "The destination port of the broker", 0,
+ 65535, DEFAULT_MQTT_PORT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_TOPIC,
+ g_param_spec_string ("topic", "Topic",
+ "The main topic of the host and option if necessary. "
+ "(topic)/(optional topic for main topic).", "",
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
{
self->host = g_strdup (DEFAULT_HOST);
self->port = DEFAULT_PORT;
+ self->dest_host = g_strdup (DEFAULT_HOST);
+ self->dest_port = DEFAULT_PORT;
+ self->topic = NULL;
self->connect_type = DEFAULT_CONNECT_TYPE;
}
case PROP_PORT:
gst_edgesink_set_port (self, g_value_get_uint (value));
break;
+ case PROP_DEST_HOST:
+ if (!g_value_get_string (value)) {
+ nns_logw ("dest host property cannot be NULL");
+ break;
+ }
+ g_free (self->dest_host);
+ self->dest_host = g_value_dup_string (value);
+ break;
+ case PROP_DEST_PORT:
+ self->dest_port = g_value_get_uint (value);
+ break;
case PROP_CONNECT_TYPE:
gst_edgesink_set_connect_type (self, g_value_get_enum (value));
break;
-
+ case PROP_TOPIC:
+ if (!g_value_get_string (value)) {
+ nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
+ break;
+ }
+ g_free (self->topic);
+ self->topic = g_value_dup_string (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_PORT:
g_value_set_uint (value, gst_edgesink_get_port (self));
break;
+ case PROP_DEST_HOST:
+ g_value_set_string (value, self->dest_host);
+ break;
+ case PROP_DEST_PORT:
+ g_value_set_uint (value, self->dest_port);
+ break;
case PROP_CONNECT_TYPE:
g_value_set_enum (value, gst_edgesink_get_connect_type (self));
break;
-
+ case PROP_TOPIC:
+ g_value_set_string (value, self->topic);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
char *port = NULL;
ret =
- nns_edge_create_handle ("TEMP_ID", self->connect_type,
+ nns_edge_create_handle ("TEMP_ID_EDGE_SINK", self->connect_type,
NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
if (NNS_EDGE_ERROR_NONE != ret) {
return FALSE;
}
- nns_edge_set_info (self->edge_h, "HOST", self->host);
- port = g_strdup_printf ("%d", self->port);
- nns_edge_set_info (self->edge_h, "PORT", port);
- g_free (port);
+ if (self->host)
+ nns_edge_set_info (self->edge_h, "HOST", self->host);
+ if (self->port > 0) {
+ port = g_strdup_printf ("%u", self->port);
+ nns_edge_set_info (self->edge_h, "PORT", port);
+ g_free (port);
+ }
+ if (self->dest_host)
+ nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+ if (self->dest_port > 0) {
+ port = g_strdup_printf ("%u", self->dest_port);
+ nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+ g_free (port);
+ }
+ if (self->topic)
+ nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
if (0 != nns_edge_start (self->edge_h)) {
nns_loge
enum
{
PROP_0,
-
+ PROP_HOST,
+ PROP_PORT,
PROP_DEST_HOST,
PROP_DEST_PORT,
PROP_CONNECT_TYPE,
+ PROP_TOPIC,
PROP_LAST
};
gobject_class->get_property = gst_edgesrc_get_property;
gobject_class->finalize = gst_edgesrc_class_finalize;
+ g_object_class_install_property (gobject_class, PROP_HOST,
+ g_param_spec_string ("host", "Host",
+ "A self host address", DEFAULT_HOST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_PORT,
+ g_param_spec_uint ("port", "Port",
+ "A self port number.",
+ 0, 65535, DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_DEST_HOST,
g_param_spec_string ("dest-host", "Destination Host",
"A host address of edgesink to receive the packets from edgesink",
"The connections type between edgesink and edgesrc.",
GST_TYPE_EDGE_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_TOPIC,
+ g_param_spec_string ("topic", "Topic",
+ "The main topic of the host and option if necessary. "
+ "(topic)/(optional topic for main topic).", "",
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
gst_base_src_set_format (basesrc, GST_FORMAT_TIME);
gst_base_src_set_async (basesrc, FALSE);
+ self->host = g_strdup (DEFAULT_HOST);
+ self->port = DEFAULT_PORT;
self->dest_host = g_strdup (DEFAULT_HOST);
self->dest_port = DEFAULT_PORT;
+ self->topic = NULL;
self->msg_queue = g_async_queue_new ();
self->connect_type = DEFAULT_CONNECT_TYPE;
}
GstEdgeSrc *self = GST_EDGESRC (object);
switch (prop_id) {
+ case PROP_HOST:
+ if (!g_value_get_string (value)) {
+ nns_logw ("host property cannot be NULL");
+ break;
+ }
+ g_free (self->host);
+ self->host = g_value_dup_string (value);
+ break;
+ case PROP_PORT:
+ self->port = g_value_get_uint (value);
+ break;
case PROP_DEST_HOST:
gst_edgesrc_set_dest_host (self, g_value_get_string (value));
break;
case PROP_CONNECT_TYPE:
gst_edgesrc_set_connect_type (self, g_value_get_enum (value));
break;
-
+ case PROP_TOPIC:
+ if (!g_value_get_string (value)) {
+ nns_logw ("topic property cannot be NULL. Query-hybrid is disabled.");
+ break;
+ }
+ g_free (self->topic);
+ self->topic = g_value_dup_string (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
GstEdgeSrc *self = GST_EDGESRC (object);
switch (prop_id) {
+ case PROP_HOST:
+ g_value_set_string (value, self->host);
+ break;
+ case PROP_PORT:
+ g_value_set_uint (value, self->port);
+ break;
case PROP_DEST_HOST:
g_value_set_string (value, gst_edgesrc_get_dest_host (self));
break;
case PROP_CONNECT_TYPE:
g_value_set_enum (value, gst_edgesrc_get_connect_type (self));
break;
-
+ case PROP_TOPIC:
+ g_value_set_string (value, self->topic);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
char *port = NULL;
ret =
- nns_edge_create_handle ("TEMP_ID", self->connect_type,
+ nns_edge_create_handle ("TEMP_ID_EDGE_SRC", self->connect_type,
NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
if (NNS_EDGE_ERROR_NONE != ret) {
return FALSE;
}
- nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
- port = g_strdup_printf ("%d", self->dest_port);
- nns_edge_set_info (self->edge_h, "DEST_PORT", port);
- g_free (port);
+ if (self->host)
+ nns_edge_set_info (self->edge_h, "HOST", self->host);
+ if (self->port > 0) {
+ port = g_strdup_printf ("%u", self->port);
+ nns_edge_set_info (self->edge_h, "PORT", port);
+ g_free (port);
+ }
+ if (self->dest_host)
+ nns_edge_set_info (self->edge_h, "DEST_HOST", self->dest_host);
+ if (self->dest_port > 0) {
+ port = g_strdup_printf ("%u", self->dest_port);
+ nns_edge_set_info (self->edge_h, "DEST_PORT", port);
+ g_free (port);
+ }
+ if (self->topic)
+ nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
#include <gtest/gtest.h>
#include <glib.h>
#include <gst/gst.h>
+#include "unittest_util.h"
+#include <gst/app/gstappsrc.h>
+#include "nnstreamer_log.h"
+
+static int data_received;
+
+/**
+ * @brief Test for edgesink get and set properties.
+ */
+TEST (edgeSink, properties0)
+{
+ gchar *pipeline;
+ GstElement *gstpipe;
+ GstElement *edge_handle;
+ gint int_val;
+ guint uint_val;
+ gchar *str_val;
+
+ /* Create a nnstreamer pipeline */
+ pipeline = g_strdup_printf (
+ "gst-launch-1.0 videotestsrc ! videoconvert ! videoscale ! "
+ "video/x-raw,width=320,height=240,format=RGB,framerate=10/1 ! "
+ "tensor_converter ! edgesink name=sinkx port=0");
+ gstpipe = gst_parse_launch (pipeline, NULL);
+ EXPECT_NE (gstpipe, nullptr);
+
+ edge_handle = gst_bin_get_by_name (GST_BIN (gstpipe), "sinkx");
+ EXPECT_NE (edge_handle, nullptr);
+
+ /* Set/Get properties of edgesink */
+ g_object_set (edge_handle, "host", "127.0.0.2", NULL);
+ g_object_get (edge_handle, "host", &str_val, NULL);
+ EXPECT_STREQ ("127.0.0.2", str_val);
+ g_free (str_val);
+
+ g_object_set (edge_handle, "port", 5001U, NULL);
+ g_object_get (edge_handle, "port", &uint_val, NULL);
+ EXPECT_EQ (5001U, uint_val);
+
+ g_object_set (edge_handle, "dest-host", "127.0.0.2", NULL);
+ g_object_get (edge_handle, "dest-host", &str_val, NULL);
+ EXPECT_STREQ ("127.0.0.2", str_val);
+ g_free (str_val);
+
+ g_object_set (edge_handle, "dest-port", 5001U, NULL);
+ g_object_get (edge_handle, "dest-port", &uint_val, NULL);
+ EXPECT_EQ (5001U, uint_val);
+
+ g_object_set (edge_handle, "connect-type", 0, NULL);
+ g_object_get (edge_handle, "connect-type", &int_val, NULL);
+ EXPECT_EQ (0, int_val);
+
+ g_object_set (edge_handle, "topic", "TEMP_TEST_TOPIC", NULL);
+ g_object_get (edge_handle, "topic", &str_val, NULL);
+ EXPECT_STREQ ("TEMP_TEST_TOPIC", str_val);
+ g_free (str_val);
+
+ gst_object_unref (edge_handle);
+ gst_object_unref (gstpipe);
+ g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesink with invalid host name.
+ */
+TEST (edgeSink, properties2_n)
+{
+ gchar *pipeline;
+ GstElement *gstpipe;
+
+ /* Create a nnstreamer pipeline */
+ pipeline = g_strdup_printf (
+ "gst-launch-1.0 videotestsrc ! videoconvert ! videoscale ! "
+ "video/x-raw,width=320,height=240,format=RGB,framerate=10/1 ! "
+ "tensor_converter ! edgesink host=f.a.i.l name=sinkx port=0");
+ gstpipe = gst_parse_launch (pipeline, NULL);
+ EXPECT_NE (gstpipe, nullptr);
+
+ EXPECT_NE (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+
+ gst_object_unref (gstpipe);
+ g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesrc get and set properties.
+ */
+TEST (edgeSrc, properties0)
+{
+ gchar *pipeline;
+ GstElement *gstpipe;
+ GstElement *edge_handle;
+ gint int_val;
+ guint uint_val;
+ gchar *str_val;
+
+ /* Create a nnstreamer pipeline */
+ pipeline = g_strdup_printf (
+ "gst-launch-1.0 edgesrc port=0 name=srcx ! "
+ "other/tensors,num_tensors=1,dimensions=3:320:240:1,types=uint8,format=static,framerate=30/1 ! "
+ "tensor_sink");
+ gstpipe = gst_parse_launch (pipeline, NULL);
+ EXPECT_NE (gstpipe, nullptr);
+
+ edge_handle = gst_bin_get_by_name (GST_BIN (gstpipe), "srcx");
+ EXPECT_NE (edge_handle, nullptr);
+
+ /* Set/Get properties of edgesrc */
+ g_object_set (edge_handle, "host", "127.0.0.2", NULL);
+ g_object_get (edge_handle, "host", &str_val, NULL);
+ EXPECT_STREQ ("127.0.0.2", str_val);
+ g_free (str_val);
+
+ g_object_set (edge_handle, "port", 5001U, NULL);
+ g_object_get (edge_handle, "port", &uint_val, NULL);
+ EXPECT_EQ (5001U, uint_val);
+
+ g_object_set (edge_handle, "dest-host", "127.0.0.2", NULL);
+ g_object_get (edge_handle, "dest-host", &str_val, NULL);
+ EXPECT_STREQ ("127.0.0.2", str_val);
+ g_free (str_val);
+
+ g_object_set (edge_handle, "dest-port", 5001U, NULL);
+ g_object_get (edge_handle, "dest-port", &uint_val, NULL);
+ EXPECT_EQ (5001U, uint_val);
+
+ g_object_set (edge_handle, "connect-type", 0, NULL);
+ g_object_get (edge_handle, "connect-type", &int_val, NULL);
+ EXPECT_EQ (0, int_val);
+
+ g_object_set (edge_handle, "topic", "TEMP_TEST_TOPIC", NULL);
+ g_object_get (edge_handle, "topic", &str_val, NULL);
+ EXPECT_STREQ ("TEMP_TEST_TOPIC", str_val);
+ g_free (str_val);
+
+ gst_object_unref (edge_handle);
+ gst_object_unref (gstpipe);
+ g_free (pipeline);
+}
+
+/**
+ * @brief Test for edgesrc with invalid host name.
+ */
+TEST (edgeSrc, properties2_n)
+{
+ gchar *pipeline;
+ GstElement *gstpipe;
+
+ /* Create a nnstreamer pipeline */
+ pipeline = g_strdup_printf (
+ "gst-launch-1.0 edgesrc host=f.a.i.l port=0 name=srcx ! "
+ "other/tensors,num_tensors=1,dimensions=3:320:240:1,types=uint8,format=static,framerate=30/1 ! "
+ "tensor_sink");
+ gstpipe = gst_parse_launch (pipeline, NULL);
+ EXPECT_NE (gstpipe, nullptr);
+
+ EXPECT_NE (setPipelineStateSync (gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+
+ gst_object_unref (gstpipe);
+ g_free (pipeline);
+}
+
+/**
+ * @brief Test data for edgesink/src (dimension 3:4:2)
+ */
+const gint test_frames[48] =
+ {1101, 1102, 1103, 1104, 1105, 1106, 1107, 1108, 1109, 1110, 1111, 1112,
+ 1113, 1114, 1115, 1116, 1117, 1118, 1119, 1120, 1121, 1122, 1123, 1124,
+ 1201, 1202, 1203, 1204, 1205, 1206, 1207, 1208, 1209, 1210, 1211, 1212,
+ 1213, 1214, 1215, 1216, 1217, 1218, 1219, 1220, 1221, 1222, 1223, 1224 };
+
+/**
+ * @brief Callback for tensor sink signal.
+ */
+static void
+new_data_cb (GstElement *element, GstBuffer *buffer, gpointer user_data)
+{
+ GstMemory *mem_res;
+ GstMapInfo info_res;
+ gint *output, i;
+ gboolean ret;
+
+ g_critical ("[DEBUG] NEW DATA RECEIVED!");
+ data_received++;
+ mem_res = gst_buffer_get_memory (buffer, 0);
+ ret = gst_memory_map (mem_res, &info_res, GST_MAP_READ);
+ ASSERT_TRUE (ret);
+ output = (gint *)info_res.data;
+
+ for (i = 0; i < 48; i++) {
+ EXPECT_EQ (test_frames[i], output[i]);
+ }
+ gst_memory_unmap (mem_res, &info_res);
+ gst_memory_unref (mem_res);
+}
-/** @todo Add edgesrc and edgesink unit test */
/**
- * @brief Test for edgesrc get and set properties
+ * @brief Test for edgesink and edgesrc.
*/
-TEST (Edge, sourceProperties0)
+TEST (edgeSinkSrc, runNormal)
{
+ gchar *sink_pipeline, *src_pipeline;
+ GstElement *sink_gstpipe, *src_gstpipe;
+ GstElement *appsrc_handle, *sink_handle, *edge_handle;
+ guint port;
+ GstBuffer *buf;
+ GstMemory *mem;
+ GstMapInfo info;
+ int ret;
+
+ /* Create a nnstreamer pipeline */
+ port= get_available_port ();
+ sink_pipeline = g_strdup_printf (
+ "appsrc name=appsrc ! other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! edgesink name=sinkx port=%u async=false", port);
+ sink_gstpipe = gst_parse_launch (sink_pipeline, NULL);
+ EXPECT_NE (sink_gstpipe, nullptr);
+
+ edge_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "sinkx");
+ EXPECT_NE (edge_handle, nullptr);
+ g_object_get (edge_handle, "port", &port, NULL);
+
+ appsrc_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "appsrc");
+ EXPECT_NE (appsrc_handle, nullptr);
+
+ src_pipeline = g_strdup_printf (
+ "gst-launch-1.0 edgesrc dest-port=%u name=srcx ! "
+ "other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! "
+ "tensor_sink name=sinkx async=false", port);
+ src_gstpipe = gst_parse_launch (src_pipeline, NULL);
+ EXPECT_NE (src_gstpipe, nullptr);
+
+ sink_handle = gst_bin_get_by_name (GST_BIN (src_gstpipe), "sinkx");
+ EXPECT_NE (sink_handle, nullptr);
+
+ g_signal_connect (sink_handle, "new-data", (GCallback)new_data_cb, NULL);
+
+ buf = gst_buffer_new ();
+ mem = gst_allocator_alloc (NULL, 192, NULL);
+ ret = gst_memory_map (mem, &info, GST_MAP_WRITE);
+ ASSERT_TRUE (ret);
+ memcpy (info.data, test_frames, 192);
+ gst_memory_unmap (mem, &info);
+ gst_buffer_append_memory (buf, mem);
+ data_received = 0;
+
+ EXPECT_EQ (setPipelineStateSync (sink_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ g_usleep (1000000);
+
+ buf = gst_buffer_ref (buf);
+ EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+ g_usleep (100000);
+
+ EXPECT_EQ (setPipelineStateSync (src_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ g_usleep (100000);
+
+ EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+ g_usleep (100000);
+
+ gst_object_unref (src_gstpipe);
+ g_free (src_pipeline);
+
+ gst_object_unref (appsrc_handle);
+ gst_object_unref (edge_handle);
+ gst_object_unref (sink_handle);
+ gst_object_unref (sink_gstpipe);
+ g_free (sink_pipeline);
}
+#ifdef ENABLE_AITT
+/**
+ * @brief Check whether MQTT broker is running or not.
+ */
+static bool
+_check_mqtt_broker ()
+{
+ int ret = 0;
+
+ ret = system ("ps aux | grep mosquitto | grep -v grep");
+ if (0 != ret) {
+ nns_logw ("MQTT broker is not running. Skip query hybrid test.");
+ return false;
+ }
+
+ return true;
+}
+
+/**
+ * @brief Test for edgesink and edgesrc using AITT.
+ */
+TEST (edgeSinkSrc, runNormalAitt)
+{
+ gchar *sink_pipeline, *src_pipeline;
+ GstElement *sink_gstpipe, *src_gstpipe;
+ GstElement *appsrc_handle, *sink_handle;
+ GstBuffer *buf;
+ GstMemory *mem;
+ GstMapInfo info;
+ int ret;
+
+ if (!_check_mqtt_broker ())
+ return;
+
+ /* Create a nnstreamer pipeline */
+ sink_pipeline = g_strdup_printf (
+ "appsrc name=appsrc ! other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! edgesink name=sinkx port=0 connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTestTopic async=false");
+ sink_gstpipe = gst_parse_launch (sink_pipeline, NULL);
+ EXPECT_NE (sink_gstpipe, nullptr);
+
+ appsrc_handle = gst_bin_get_by_name (GST_BIN (sink_gstpipe), "appsrc");
+ EXPECT_NE (appsrc_handle, nullptr);
+
+ src_pipeline = g_strdup_printf (
+ "gst-launch-1.0 edgesrc connect-type=AITT dest-host=127.0.0.1 dest-port=1883 topic=tempTestTopic name=srcx ! "
+ "other/tensor,dimension=(string)3:4:2:2,type=(string)int32,framerate=(fraction)0/1 ! "
+ "tensor_sink name=sinkx async=false");
+ src_gstpipe = gst_parse_launch (src_pipeline, NULL);
+ EXPECT_NE (src_gstpipe, nullptr);
+
+ sink_handle = gst_bin_get_by_name (GST_BIN (src_gstpipe), "sinkx");
+ EXPECT_NE (sink_handle, nullptr);
+
+ g_signal_connect (sink_handle, "new-data", (GCallback)new_data_cb, NULL);
+
+ buf = gst_buffer_new ();
+ mem = gst_allocator_alloc (NULL, 192, NULL);
+ ret = gst_memory_map (mem, &info, GST_MAP_WRITE);
+ ASSERT_TRUE (ret);
+ memcpy (info.data, test_frames, 192);
+ gst_memory_unmap (mem, &info);
+ gst_buffer_append_memory (buf, mem);
+ data_received = 0;
+
+ EXPECT_EQ (setPipelineStateSync (sink_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ g_usleep (1000000);
+
+ buf = gst_buffer_ref (buf);
+ EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+ g_usleep (100000);
+
+ EXPECT_EQ (setPipelineStateSync (src_gstpipe, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
+ g_usleep (100000);
+
+ EXPECT_EQ (gst_app_src_push_buffer (GST_APP_SRC (appsrc_handle), buf), GST_FLOW_OK);
+ g_usleep (100000);
+
+ gst_object_unref (src_gstpipe);
+ g_free (src_pipeline);
+
+ gst_object_unref (appsrc_handle);
+ gst_object_unref (sink_handle);
+ gst_object_unref (sink_gstpipe);
+ g_free (sink_pipeline);
+}
+#endif
/**
* @brief Main GTest
*/