[datareposrc] Push GstBuffer according to framerate
authorhyunil park <hyunil46.park@samsung.com>
Tue, 18 Jul 2023 01:01:11 +0000 (10:01 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Thu, 3 Aug 2023 02:44:24 +0000 (11:44 +0900)
datareposrc push GstBuffer according to framerate when sink element sets sync=true

- Set TIMESTAMP to GstBuffer
- Add function to calculate running_time for one frame

Signed-off-by: hyunil park <hyunil46.park@samsung.com>
gst/datarepo/gstdatareposrc.c
gst/datarepo/gstdatareposrc.h
tests/nnstreamer_datarepo/unittest_datareposrc.cc

index 07e69e4..dd50a48 100644 (file)
@@ -254,6 +254,8 @@ gst_data_repo_src_init (GstDataRepoSrc * src)
   src->sample_size = 0;
   src->need_changed_caps = FALSE;
   src->is_static_tensors = FALSE;
+  src->n_frame = 0;
+  src->running_time = 0;
 
   /* Filling the buffer should be pending until set_caps() */
   gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
@@ -1099,11 +1101,50 @@ error_exit:
 }
 
 /**
+ * @brief Set timestamp
+ */
+static void
+gst_data_repo_src_set_timestamp (GstDataRepoSrc * src, GstBuffer * buffer)
+{
+
+  GstClockTime next_time;
+  GstClockTime duration = GST_CLOCK_TIME_NONE;
+
+  g_return_if_fail (src != NULL);
+  g_return_if_fail (buffer != NULL);
+
+  /* Unlike video, audio should control one sample by framerate. */
+  if (src->data_type == GST_DATA_REPO_DATA_AUDIO) {
+    GST_WARNING_OBJECT (src,
+        "Use audiorate element for the framerate of audio");
+    return;
+  }
+
+  duration = gst_util_uint64_scale_int (GST_SECOND, src->rate_d, src->rate_n);
+  GST_BUFFER_DURATION (buffer) = duration;
+  GST_BUFFER_TIMESTAMP (buffer) = src->running_time;
+  gst_object_sync_values (GST_OBJECT (src), GST_BUFFER_TIMESTAMP (buffer));
+
+  next_time =
+      gst_util_uint64_scale (src->n_frame++, src->rate_d * GST_SECOND,
+      src->rate_n);
+  src->running_time = next_time;
+
+  GST_LOG_OBJECT (src, "next_time %" GST_TIME_FORMAT "",
+      GST_TIME_ARGS (next_time));
+  GST_LOG_OBJECT (src,
+      "timestamp [%" GST_TIME_FORMAT " dur %" GST_TIME_FORMAT "]",
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
+      GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
+}
+
+/**
  * @brief Function to create a buffer
  */
 static GstFlowReturn
 gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
 {
+  GstFlowReturn ret = GST_FLOW_OK;
   GstDataRepoSrc *src;
   src = GST_DATA_REPO_SRC (pushsrc);
 
@@ -1121,17 +1162,28 @@ gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
     case GST_DATA_REPO_DATA_AUDIO:
     case GST_DATA_REPO_DATA_TEXT:
     case GST_DATA_REPO_DATA_OCTET:
-      return gst_data_repo_src_read_others (src, buffer);
+      ret = gst_data_repo_src_read_others (src, buffer);
+      break;
     case GST_DATA_REPO_DATA_TENSOR:
       if (src->is_static_tensors)
-        return gst_data_repo_src_read_tensors (src, buffer);
+        ret = gst_data_repo_src_read_tensors (src, buffer);
       else
-        return gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer);
+        ret = gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer);
+      break;
     case GST_DATA_REPO_DATA_IMAGE:
-      return gst_data_repo_src_read_multi_images (src, buffer);
+      ret = gst_data_repo_src_read_multi_images (src, buffer);
+      break;
     default:
       return GST_FLOW_ERROR;
   }
+
+  if (ret != GST_FLOW_OK)
+    return ret;
+
+  if (src->rate_n)
+    gst_data_repo_src_set_timestamp (src, *buffer);
+
+  return GST_FLOW_OK;
 }
 
 /**
@@ -1348,6 +1400,7 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
   g_return_val_if_fail (caps != NULL, FALSE);
 
   src->data_type = gst_data_repo_get_data_type_from_caps (caps);
+  s = gst_caps_get_structure (caps, 0);
 
   switch (src->data_type) {
     case GST_DATA_REPO_DATA_VIDEO:
@@ -1368,7 +1421,12 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
       break;
   }
 
-  GST_DEBUG_OBJECT (src, "data type: %d", src->data_type);
+  v = gst_structure_get_value (s, "framerate");
+  src->rate_n = gst_value_get_fraction_numerator (v);
+  src->rate_d = gst_value_get_fraction_denominator (v);
+  GST_LOG_OBJECT (src, "framerate %d/%d", src->rate_n, src->rate_d);
+  GST_LOG_OBJECT (src, "data type: %d", src->data_type);
+
   return (src->data_type != GST_DATA_REPO_DATA_UNKNOWN);
 }
 
index 1782b2e..49002c8 100644 (file)
@@ -89,6 +89,10 @@ struct _GstDataRepoSrc {
   guint sample_offset_array_len;
   guint tensor_size_array_len;
   guint tensor_count_array_len;
+
+  GstClockTime running_time;    /**< one frame running time */
+  gint rate_n, rate_d;
+  guint64 n_frame;
 };
 
 /**
index 8258da2..276955e 100644 (file)
@@ -62,7 +62,6 @@ static void
 new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data)
 {
   (*user_data)++;
-  g_warning ("count:%d", *user_data);
   return;
 }
 
@@ -77,15 +76,15 @@ create_sparse_tensors_test_file ()
   gchar *file_path = get_file_path (filename);
   gchar *json_path = get_file_path (json);
 
-  const gchar *str_pipeline = g_strdup_printf (
+  gchar *str_pipeline = g_strdup_printf (
       "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
       "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! "
       "datareposink location=sparse.data json=sparse.json",
       file_path, json_path);
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
-
   loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
@@ -104,21 +103,24 @@ create_sparse_tensors_test_file ()
  * @brief create flexible tensors file
  */
 static void
-create_flexible_tensors_test_file ()
+create_flexible_tensors_test_file (gint fps)
 {
   GstBus *bus;
   GMainLoop *loop;
-  const gchar *str_pipeline
-      = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
-        "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 "
-        "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
-        "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 "
-        "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
-        "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 "
-        "join name=join0 ! other/tensors,format=flexible ! "
-        "datareposink location=flexible.data json=flexible.json";
+  gint rate_n = fps;
+  gchar *str_pipeline = g_strdup_printf (
+      "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! "
+      "video/x-raw,format=RGB,width=176,height=144,framerate=%d/1 ! tensor_converter ! join0.sink_0 "
+      "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! "
+      "video/x-raw,format=RGB,width=320,height=240,framerate=%d/1 ! tensor_converter ! join0.sink_1 "
+      "videotestsrc num-buffers=10 ! videoconvert ! videoscale ! "
+      "video/x-raw,format=RGB,width=640,height=480,framerate=%d/1 ! tensor_converter ! join0.sink_2 "
+      "join name=join0 ! other/tensors,format=flexible ! "
+      "datareposink location=flexible.data json=flexible.json",
+      rate_n, rate_n, rate_n);
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   loop = g_main_loop_new (NULL, FALSE);
@@ -320,24 +322,21 @@ TEST (datareposrc, readVideoRaw)
  */
 TEST (datareposrc, readAudioRaw)
 {
-  gint buffer_count = 0;
-  GstElement *tensor_sink;
+  gchar *data_1 = NULL, *data_2 = NULL;
+  gsize size_1, size_2;
   GstBus *bus;
   GMainLoop *loop;
-  GCallback handler = G_CALLBACK (new_data_cb);
+  gint ret = -1;
   const gchar *str_pipeline
-      = "datareposrc location=audio1.raw json=audio1.json ! tensor_converter ! tensor_sink name=tensor_sink0";
+      = "datareposrc location=audio1.raw json=audio1.json ! tee name=t "
+        "t. ! queue ! datareposink location=result.raw json=result.json "
+        "t. ! queue ! tensor_sink";
 
   create_audio_test_file ();
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   ASSERT_NE (pipeline, nullptr);
 
-  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
-  ASSERT_NE (tensor_sink, nullptr);
-
-  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
-
   loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
@@ -349,15 +348,36 @@ TEST (datareposrc, readAudioRaw)
 
   setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
 
-  EXPECT_NE (buffer_count, 0);
-  handler = NULL;
-
-  gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
 
+  if (!g_file_get_contents ("aduio1.raw", &data_1, &size_1, NULL)) {
+    goto error;
+  }
+
+  if (!g_file_get_contents ("result.raw", &data_2, &size_2, NULL)) {
+    goto error;
+  }
+  EXPECT_EQ (size_1, size_2);
+  g_free (data_1);
+  g_free (data_2);
+
+  if (!g_file_get_contents ("audio1.json", &data_1, &size_1, NULL)) {
+    goto error;
+  }
+
+  if (!g_file_get_contents ("result.json", &data_2, &size_2, NULL)) {
+    goto error;
+  }
+  ret = g_strcmp0 (data_1, data_2);
+  EXPECT_EQ (ret, 0);
+error:
+  g_free (data_1);
+  g_free (data_2);
   g_remove ("audio1.json");
   g_remove ("audio1.raw");
+  g_remove ("result.json");
+  g_remove ("result.raw");
 }
 
 /**
@@ -551,22 +571,106 @@ TEST (datareposrc, readTensors)
  */
 TEST (datareposrc, readFlexibleTensors)
 {
-  gint buffer_count = 0;
+  gchar *data_1 = NULL, *data_2 = NULL;
+  gsize size_1, size_2;
+  gint fps = 10, ret = -1;
+  GstBus *bus;
+  const gchar *str_pipeline = NULL;
+  GMainLoop *loop;
+  str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tee name=t "
+                 "t. ! queue ! datareposink location=result.data json=result.json "
+                 "t. ! queue ! tensor_sink";
+
+  create_flexible_tensors_test_file (fps);
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  if (!g_file_get_contents ("flexible.raw", &data_1, &size_1, NULL)) {
+    goto error;
+  }
+
+  if (!g_file_get_contents ("result.raw", &data_2, &size_2, NULL)) {
+    goto error;
+  }
+  EXPECT_EQ (size_1, size_2);
+  g_free (data_1);
+  g_free (data_2);
+
+  if (!g_file_get_contents ("flexible.json", &data_1, &size_1, NULL)) {
+    goto error;
+  }
+
+  if (!g_file_get_contents ("result.json", &data_2, &size_2, NULL)) {
+    goto error;
+  }
+  ret = g_strcmp0 (data_1, data_2);
+  EXPECT_EQ (ret, 0);
+error:
+  g_free (data_1);
+  g_free (data_2);
+  g_remove ("flexible.json");
+  g_remove ("flexible.data");
+  g_remove ("result.json");
+  g_remove ("result.data");
+}
+
+
+/**
+ * @brief Framerate Test for reading a file composed of flexible tensors
+ */
+TEST (datareposrc, fps30ReadFlexibleTensors)
+{
+  gint fps = 30;
+  guint64 start_time, end_time;
+  gdouble elapsed_time;
   GstElement *tensor_sink;
   GstBus *bus;
   const gchar *str_pipeline = NULL;
+  const gchar *test_pipeline = "videotestsrc num-buffers=30 ! fakesink sync=true";
   GMainLoop *loop;
-  GCallback handler = G_CALLBACK (new_data_cb);
-  str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tensor_sink name=tensor_sink0";
+  str_pipeline = "datareposrc location=flexible.data json=flexible.json ! queue ! tensor_sink name=tensor_sink0 sync=true";
 
-  create_flexible_tensors_test_file ();
+  create_flexible_tensors_test_file (fps);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   ASSERT_NE (pipeline, nullptr);
 
-  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
-  ASSERT_NE (tensor_sink, nullptr);
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
 
-  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+  start_time = g_get_monotonic_time ();
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  end_time = g_get_monotonic_time ();
+  elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC;
+
+  g_print ("Elapsed time: %.6f second\n", elapsed_time);
+  EXPECT_LT (0.8, elapsed_time);
+
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
 
   loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
@@ -574,17 +678,49 @@ TEST (datareposrc, readFlexibleTensors)
   gst_bus_add_watch (bus, bus_callback, loop);
   gst_object_unref (bus);
 
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  g_object_set (GST_OBJECT (tensor_sink), "sync", FALSE, NULL);
+
+  start_time = g_get_monotonic_time ();
+
   setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
   g_main_loop_run (loop);
 
   setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
-  EXPECT_NE (buffer_count, 0);
-  handler = NULL;
+  end_time = g_get_monotonic_time ();
+  elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC;
+
+  g_print ("Elapsed time: %.6f second\n", elapsed_time);
+  EXPECT_LT (elapsed_time, 0.05);
 
   gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
 
+  pipeline = gst_parse_launch (test_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  start_time = g_get_monotonic_time ();
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  end_time = g_get_monotonic_time ();
+  elapsed_time = (end_time - start_time) / (double) G_USEC_PER_SEC;
+
+  g_print ("Elapsed time: %.6f second\n", elapsed_time);
+  EXPECT_LT (0.8, elapsed_time);
+
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
   g_remove ("flexible.json");
   g_remove ("flexible.data");
 }
@@ -595,20 +731,19 @@ TEST (datareposrc, readFlexibleTensors)
  */
 TEST (datareposrc, readSparseTensors)
 {
-  GFile *file = NULL;
-  GFileInfo *info = NULL;
-  gint64 size, org_size = 31760;
+  gchar *data = NULL;
+  gsize size, org_size = 31760;
   gint buffer_count = 0;
   GstElement *tensor_sink;
   GstBus *bus;
   const gchar *str_pipeline = NULL;
   GMainLoop *loop;
   GCallback handler = G_CALLBACK (new_data_cb);
-  str_pipeline
-      = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! "
-        "other/tensors, format=static, num_tensors=2, framerate=0/1, "
-        "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t "
-        "t. ! queue ! filesink location=sample.data t. ! queue ! tensor_sink name=tensor_sink0";
+  str_pipeline = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! "
+                 "other/tensors, format=static, num_tensors=2, framerate=0/1, "
+                 "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t "
+                 "t. ! queue ! filesink location=sample.data "
+                 "t. ! queue ! tensor_sink name=tensor_sink0";
 
   create_sparse_tensors_test_file ();
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
@@ -632,26 +767,22 @@ TEST (datareposrc, readSparseTensors)
   EXPECT_NE (buffer_count, 0);
   handler = NULL;
 
-  file = g_file_new_for_path ("sparse.data");
-  info = g_file_query_info (
-      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
-  size = g_file_info_get_size (info);
-  g_object_unref (file);
-  g_object_unref (info);
-  EXPECT_LT (size, org_size);
-
-  file = g_file_new_for_path ("sample.data");
-  info = g_file_query_info (
-      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
-  size = g_file_info_get_size (info);
-  g_object_unref (file);
-  g_object_unref (info);
-  EXPECT_EQ (size, org_size);
-
   gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
 
+  if (!g_file_get_contents ("sparse.data", &data, &size, NULL)) {
+    goto error;
+  }
+  EXPECT_LT (size, org_size);
+  g_free (data);
+
+  if (!g_file_get_contents ("sample.data", &data, &size, NULL)) {
+    goto error;
+  }
+  EXPECT_EQ (size, org_size);
+error:
+  g_free (data);
   g_remove ("sparse.json");
   g_remove ("sparse.data");
   g_remove ("sample.data");
@@ -972,9 +1103,10 @@ TEST (datareposrc, invalidTensorsSequence0_n)
  * @brief Test for reading a file composed of non-flexible tensors
  * the default shuffle is TRUE.
  */
-TEST (datareposrc, readInvalidFlexibleTensors)
+TEST (datareposrc, readInvalidFlexibleTensors_n)
 {
   gint buffer_count = 0;
+  gint fps = 10;
   GstBus *bus;
   GMainLoop *loop;
   GCallback handler = G_CALLBACK (new_data_cb);
@@ -982,7 +1114,7 @@ TEST (datareposrc, readInvalidFlexibleTensors)
       = "datareposrc location=audio1.raw json=flexible.json ! tensor_sink name=tensor_sink0";
   GstElement *tensor_sink;
 
-  create_flexible_tensors_test_file ();
+  create_flexible_tensors_test_file (fps);
   create_audio_test_file ();
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
@@ -1023,7 +1155,7 @@ TEST (datareposrc, readInvalidFlexibleTensors)
  * @brief Test for reading a file composed of non-sparse tensors
  * the default shuffle is TRUE.
  */
-TEST (datareposrc, readInvalidSparseTensors)
+TEST (datareposrc, readInvalidSparseTensors_n)
 {
   gint buffer_count = 0;
   GstBus *bus;