From 03192b1138d97ba709cc2c7d32fb85bc865e1d06 Mon Sep 17 00:00:00 2001 From: hyunil park Date: Tue, 18 Jul 2023 10:01:11 +0900 Subject: [PATCH] [datareposrc] Push GstBuffer according to framerate datareposrc push GstBuffer according to framerate when sink element sets sync=true - Set TIMESTAMP to GstBuffer - Add function to calculate running_time for one frame Signed-off-by: hyunil park --- gst/datarepo/gstdatareposrc.c | 68 +++++- gst/datarepo/gstdatareposrc.h | 4 + tests/nnstreamer_datarepo/unittest_datareposrc.cc | 256 ++++++++++++++++------ 3 files changed, 261 insertions(+), 67 deletions(-) diff --git a/gst/datarepo/gstdatareposrc.c b/gst/datarepo/gstdatareposrc.c index 07e69e4..dd50a48 100644 --- a/gst/datarepo/gstdatareposrc.c +++ b/gst/datarepo/gstdatareposrc.c @@ -254,6 +254,8 @@ gst_data_repo_src_init (GstDataRepoSrc * src) src->sample_size = 0; src->need_changed_caps = FALSE; src->is_static_tensors = FALSE; + src->n_frame = 0; + src->running_time = 0; /* Filling the buffer should be pending until set_caps() */ gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); @@ -1099,11 +1101,50 @@ error_exit: } /** + * @brief Set timestamp + */ +static void +gst_data_repo_src_set_timestamp (GstDataRepoSrc * src, GstBuffer * buffer) +{ + + GstClockTime next_time; + GstClockTime duration = GST_CLOCK_TIME_NONE; + + g_return_if_fail (src != NULL); + g_return_if_fail (buffer != NULL); + + /* Unlike video, audio should control one sample by framerate. */ + if (src->data_type == GST_DATA_REPO_DATA_AUDIO) { + GST_WARNING_OBJECT (src, + "Use audiorate element for the framerate of audio"); + return; + } + + duration = gst_util_uint64_scale_int (GST_SECOND, src->rate_d, src->rate_n); + GST_BUFFER_DURATION (buffer) = duration; + GST_BUFFER_TIMESTAMP (buffer) = src->running_time; + gst_object_sync_values (GST_OBJECT (src), GST_BUFFER_TIMESTAMP (buffer)); + + next_time = + gst_util_uint64_scale (src->n_frame++, src->rate_d * GST_SECOND, + src->rate_n); + src->running_time = next_time; + + GST_LOG_OBJECT (src, "next_time %" GST_TIME_FORMAT "", + GST_TIME_ARGS (next_time)); + GST_LOG_OBJECT (src, + "timestamp [%" GST_TIME_FORMAT " dur %" GST_TIME_FORMAT "]", + GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), + GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); +} + +/** * @brief Function to create a buffer */ static GstFlowReturn gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer) { + GstFlowReturn ret = GST_FLOW_OK; GstDataRepoSrc *src; src = GST_DATA_REPO_SRC (pushsrc); @@ -1121,17 +1162,28 @@ gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer) case GST_DATA_REPO_DATA_AUDIO: case GST_DATA_REPO_DATA_TEXT: case GST_DATA_REPO_DATA_OCTET: - return gst_data_repo_src_read_others (src, buffer); + ret = gst_data_repo_src_read_others (src, buffer); + break; case GST_DATA_REPO_DATA_TENSOR: if (src->is_static_tensors) - return gst_data_repo_src_read_tensors (src, buffer); + ret = gst_data_repo_src_read_tensors (src, buffer); else - return gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer); + ret = gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer); + break; case GST_DATA_REPO_DATA_IMAGE: - return gst_data_repo_src_read_multi_images (src, buffer); + ret = gst_data_repo_src_read_multi_images (src, buffer); + break; default: return GST_FLOW_ERROR; } + + if (ret != GST_FLOW_OK) + return ret; + + if (src->rate_n) + gst_data_repo_src_set_timestamp (src, *buffer); + + return GST_FLOW_OK; } /** @@ -1348,6 +1400,7 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps) g_return_val_if_fail (caps != NULL, FALSE); src->data_type = gst_data_repo_get_data_type_from_caps (caps); + s = gst_caps_get_structure (caps, 0); switch (src->data_type) { case GST_DATA_REPO_DATA_VIDEO: @@ -1368,7 +1421,12 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps) break; } - GST_DEBUG_OBJECT (src, "data type: %d", src->data_type); + v = gst_structure_get_value (s, "framerate"); + src->rate_n = gst_value_get_fraction_numerator (v); + src->rate_d = gst_value_get_fraction_denominator (v); + GST_LOG_OBJECT (src, "framerate %d/%d", src->rate_n, src->rate_d); + GST_LOG_OBJECT (src, "data type: %d", src->data_type); + return (src->data_type != GST_DATA_REPO_DATA_UNKNOWN); } diff --git a/gst/datarepo/gstdatareposrc.h b/gst/datarepo/gstdatareposrc.h index 1782b2e..49002c8 100644 --- a/gst/datarepo/gstdatareposrc.h +++ b/gst/datarepo/gstdatareposrc.h @@ -89,6 +89,10 @@ struct _GstDataRepoSrc { guint sample_offset_array_len; guint tensor_size_array_len; guint tensor_count_array_len; + + GstClockTime running_time; /**< one frame running time */ + gint rate_n, rate_d; + guint64 n_frame; }; /** diff --git a/tests/nnstreamer_datarepo/unittest_datareposrc.cc b/tests/nnstreamer_datarepo/unittest_datareposrc.cc index 8258da2..276955e 100644 --- a/tests/nnstreamer_datarepo/unittest_datareposrc.cc +++ b/tests/nnstreamer_datarepo/unittest_datareposrc.cc @@ -62,7 +62,6 @@ static void new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data) { (*user_data)++; - g_warning ("count:%d", *user_data); return; } @@ -77,15 +76,15 @@ create_sparse_tensors_test_file () gchar *file_path = get_file_path (filename); gchar *json_path = get_file_path (json); - const gchar *str_pipeline = g_strdup_printf ( + gchar *str_pipeline = g_strdup_printf ( "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! " "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! " "datareposink location=sparse.data json=sparse.json", file_path, json_path); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); - loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); ASSERT_NE (bus, nullptr); @@ -104,21 +103,24 @@ create_sparse_tensors_test_file () * @brief create flexible tensors file */ static void -create_flexible_tensors_test_file () +create_flexible_tensors_test_file (gint fps) { GstBus *bus; GMainLoop *loop; - const gchar *str_pipeline - = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " - "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 " - "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " - "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 " - "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " - "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 " - "join name=join0 ! other/tensors,format=flexible ! " - "datareposink location=flexible.data json=flexible.json"; + gint rate_n = fps; + gchar *str_pipeline = g_strdup_printf ( + "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=176,height=144,framerate=%d/1 ! tensor_converter ! join0.sink_0 " + "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=320,height=240,framerate=%d/1 ! tensor_converter ! join0.sink_1 " + "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=640,height=480,framerate=%d/1 ! tensor_converter ! join0.sink_2 " + "join name=join0 ! other/tensors,format=flexible ! " + "datareposink location=flexible.data json=flexible.json", + rate_n, rate_n, rate_n); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); loop = g_main_loop_new (NULL, FALSE); @@ -320,24 +322,21 @@ TEST (datareposrc, readVideoRaw) */ TEST (datareposrc, readAudioRaw) { - gint buffer_count = 0; - GstElement *tensor_sink; + gchar *data_1 = NULL, *data_2 = NULL; + gsize size_1, size_2; GstBus *bus; GMainLoop *loop; - GCallback handler = G_CALLBACK (new_data_cb); + gint ret = -1; const gchar *str_pipeline - = "datareposrc location=audio1.raw json=audio1.json ! tensor_converter ! tensor_sink name=tensor_sink0"; + = "datareposrc location=audio1.raw json=audio1.json ! tee name=t " + "t. ! queue ! datareposink location=result.raw json=result.json " + "t. ! queue ! tensor_sink"; create_audio_test_file (); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); ASSERT_NE (pipeline, nullptr); - tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0"); - ASSERT_NE (tensor_sink, nullptr); - - g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count); - loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); ASSERT_NE (bus, nullptr); @@ -349,15 +348,36 @@ TEST (datareposrc, readAudioRaw) setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); - EXPECT_NE (buffer_count, 0); - handler = NULL; - - gst_object_unref (tensor_sink); gst_object_unref (pipeline); g_main_loop_unref (loop); + if (!g_file_get_contents ("aduio1.raw", &data_1, &size_1, NULL)) { + goto error; + } + + if (!g_file_get_contents ("result.raw", &data_2, &size_2, NULL)) { + goto error; + } + EXPECT_EQ (size_1, size_2); + g_free (data_1); + g_free (data_2); + + if (!g_file_get_contents ("audio1.json", &data_1, &size_1, NULL)) { + goto error; + } + + if (!g_file_get_contents ("result.json", &data_2, &size_2, NULL)) { + goto error; + } + ret = g_strcmp0 (data_1, data_2); + EXPECT_EQ (ret, 0); +error: + g_free (data_1); + g_free (data_2); g_remove ("audio1.json"); g_remove ("audio1.raw"); + g_remove ("result.json"); + g_remove ("result.raw"); } /** @@ -551,22 +571,106 @@ TEST (datareposrc, readTensors) */ TEST (datareposrc, readFlexibleTensors) { - gint buffer_count = 0; + gchar *data_1 = NULL, *data_2 = NULL; + gsize size_1, size_2; + gint fps = 10, ret = -1; + GstBus *bus; + const gchar *str_pipeline = NULL; + GMainLoop *loop; + str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tee name=t " + "t. ! queue ! datareposink location=result.data json=result.json " + "t. ! queue ! tensor_sink"; + + create_flexible_tensors_test_file (fps); + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + if (!g_file_get_contents ("flexible.raw", &data_1, &size_1, NULL)) { + goto error; + } + + if (!g_file_get_contents ("result.raw", &data_2, &size_2, NULL)) { + goto error; + } + EXPECT_EQ (size_1, size_2); + g_free (data_1); + g_free (data_2); + + if (!g_file_get_contents ("flexible.json", &data_1, &size_1, NULL)) { + goto error; + } + + if (!g_file_get_contents ("result.json", &data_2, &size_2, NULL)) { + goto error; + } + ret = g_strcmp0 (data_1, data_2); + EXPECT_EQ (ret, 0); +error: + g_free (data_1); + g_free (data_2); + g_remove ("flexible.json"); + g_remove ("flexible.data"); + g_remove ("result.json"); + g_remove ("result.data"); +} + + +/** + * @brief Framerate Test for reading a file composed of flexible tensors + */ +TEST (datareposrc, fps30ReadFlexibleTensors) +{ + gint fps = 30; + guint64 start_time, end_time; + gdouble elapsed_time; GstElement *tensor_sink; GstBus *bus; const gchar *str_pipeline = NULL; + const gchar *test_pipeline = "videotestsrc num-buffers=30 ! fakesink sync=true"; GMainLoop *loop; - GCallback handler = G_CALLBACK (new_data_cb); - str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tensor_sink name=tensor_sink0"; + str_pipeline = "datareposrc location=flexible.data json=flexible.json ! queue ! tensor_sink name=tensor_sink0 sync=true"; - create_flexible_tensors_test_file (); + create_flexible_tensors_test_file (fps); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); ASSERT_NE (pipeline, nullptr); - tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0"); - ASSERT_NE (tensor_sink, nullptr); + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); - g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count); + start_time = g_get_monotonic_time (); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + end_time = g_get_monotonic_time (); + elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC; + + g_print ("Elapsed time: %.6f second\n", elapsed_time); + EXPECT_LT (0.8, elapsed_time); + + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); @@ -574,17 +678,49 @@ TEST (datareposrc, readFlexibleTensors) gst_bus_add_watch (bus, bus_callback, loop); gst_object_unref (bus); + tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0"); + g_object_set (GST_OBJECT (tensor_sink), "sync", FALSE, NULL); + + start_time = g_get_monotonic_time (); + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); g_main_loop_run (loop); setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); - EXPECT_NE (buffer_count, 0); - handler = NULL; + end_time = g_get_monotonic_time (); + elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC; + + g_print ("Elapsed time: %.6f second\n", elapsed_time); + EXPECT_LT (elapsed_time, 0.05); gst_object_unref (tensor_sink); gst_object_unref (pipeline); g_main_loop_unref (loop); + pipeline = gst_parse_launch (test_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + start_time = g_get_monotonic_time (); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + end_time = g_get_monotonic_time (); + elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC; + + g_print ("Elapsed time: %.6f second\n", elapsed_time); + EXPECT_LT (0.8, elapsed_time); + + gst_object_unref (pipeline); + g_main_loop_unref (loop); + g_remove ("flexible.json"); g_remove ("flexible.data"); } @@ -595,20 +731,19 @@ TEST (datareposrc, readFlexibleTensors) */ TEST (datareposrc, readSparseTensors) { - GFile *file = NULL; - GFileInfo *info = NULL; - gint64 size, org_size = 31760; + gchar *data = NULL; + gsize size, org_size = 31760; gint buffer_count = 0; GstElement *tensor_sink; GstBus *bus; const gchar *str_pipeline = NULL; GMainLoop *loop; GCallback handler = G_CALLBACK (new_data_cb); - str_pipeline - = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! " - "other/tensors, format=static, num_tensors=2, framerate=0/1, " - "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t " - "t. ! queue ! filesink location=sample.data t. ! queue ! tensor_sink name=tensor_sink0"; + str_pipeline = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! " + "other/tensors, format=static, num_tensors=2, framerate=0/1, " + "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t " + "t. ! queue ! filesink location=sample.data " + "t. ! queue ! tensor_sink name=tensor_sink0"; create_sparse_tensors_test_file (); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); @@ -632,26 +767,22 @@ TEST (datareposrc, readSparseTensors) EXPECT_NE (buffer_count, 0); handler = NULL; - file = g_file_new_for_path ("sparse.data"); - info = g_file_query_info ( - file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); - size = g_file_info_get_size (info); - g_object_unref (file); - g_object_unref (info); - EXPECT_LT (size, org_size); - - file = g_file_new_for_path ("sample.data"); - info = g_file_query_info ( - file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); - size = g_file_info_get_size (info); - g_object_unref (file); - g_object_unref (info); - EXPECT_EQ (size, org_size); - gst_object_unref (tensor_sink); gst_object_unref (pipeline); g_main_loop_unref (loop); + if (!g_file_get_contents ("sparse.data", &data, &size, NULL)) { + goto error; + } + EXPECT_LT (size, org_size); + g_free (data); + + if (!g_file_get_contents ("sample.data", &data, &size, NULL)) { + goto error; + } + EXPECT_EQ (size, org_size); +error: + g_free (data); g_remove ("sparse.json"); g_remove ("sparse.data"); g_remove ("sample.data"); @@ -972,9 +1103,10 @@ TEST (datareposrc, invalidTensorsSequence0_n) * @brief Test for reading a file composed of non-flexible tensors * the default shuffle is TRUE. */ -TEST (datareposrc, readInvalidFlexibleTensors) +TEST (datareposrc, readInvalidFlexibleTensors_n) { gint buffer_count = 0; + gint fps = 10; GstBus *bus; GMainLoop *loop; GCallback handler = G_CALLBACK (new_data_cb); @@ -982,7 +1114,7 @@ TEST (datareposrc, readInvalidFlexibleTensors) = "datareposrc location=audio1.raw json=flexible.json ! tensor_sink name=tensor_sink0"; GstElement *tensor_sink; - create_flexible_tensors_test_file (); + create_flexible_tensors_test_file (fps); create_audio_test_file (); GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); @@ -1023,7 +1155,7 @@ TEST (datareposrc, readInvalidFlexibleTensors) * @brief Test for reading a file composed of non-sparse tensors * the default shuffle is TRUE. */ -TEST (datareposrc, readInvalidSparseTensors) +TEST (datareposrc, readInvalidSparseTensors_n) { gint buffer_count = 0; GstBus *bus; -- 2.7.4