[datarepo] Add function to read and write sparse tensors accepted/tizen/unified/20230727.173058
authorhyunil park <hyunil46.park@samsung.com>
Mon, 10 Jul 2023 06:37:18 +0000 (15:37 +0900)
committerSangjung Woo <again4you@gmail.com>
Wed, 26 Jul 2023 07:54:24 +0000 (16:54 +0900)
- Add writing sparse tensors to datareposink
  To add memory to gstbuffer by number of sparse tensors when reading a sample in datareposrc,
  sample_offset, tensor_size, and tensor_count fields are added to the JSON file
- Add reading sparse tensors to datareposrc
  If the tensor format of JSON file is sparse, read a sample using the JSON meta information
  (sample_offset, tensor_size, tensor_count) and append memory to GstBuffer as many as the number
  of sparse tensors.
- Add unit test
- bug fix about shuffle index

Signed-off-by: hyunil park <hyunil46.park@samsung.com>
gst/datarepo/gstdatareposink.c
gst/datarepo/gstdatareposink.h
gst/datarepo/gstdatareposrc.c
gst/datarepo/gstdatareposrc.h
tests/nnstreamer_datarepo/unittest_datareposink.cc
tests/nnstreamer_datarepo/unittest_datareposrc.cc

index f709c28..f1c4ba1 100644 (file)
@@ -39,7 +39,7 @@
 /**
  * @brief Tensors caps
  */
-#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible }")
+#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }")
 /**
  * @brief Video caps
  */
@@ -178,11 +178,11 @@ gst_data_repo_sink_init (GstDataRepoSink * sink)
   sink->fd = 0;
   sink->fd_offset = 0;
   sink->data_type = GST_DATA_REPO_DATA_UNKNOWN;
-  sink->is_flexible_tensors = FALSE;
+  sink->is_static_tensors = FALSE;
   sink->fixed_caps = NULL;
   sink->json_object = NULL;
   sink->total_samples = 0;
-  sink->flexible_tensor_count = 0;
+  sink->cumulative_tensors = 0;
   sink->json_object = json_object_new ();
   sink->sample_offset_array = json_array_new ();
   sink->tensor_size_array = json_array_new ();
@@ -298,10 +298,10 @@ gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer)
 }
 
 /**
- * @brief Function to write flexible tensors
+ * @brief Function to write flexible tensors or sparse tensors
  */
 static GstFlowReturn
-gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink,
+gst_data_repo_sink_write_flexible_or_sparse_tensors (GstDataRepoSink * sink,
     GstBuffer * buffer)
 {
   guint num_tensors, i;
@@ -331,7 +331,8 @@ gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink,
     }
 
     if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
-      GST_ERROR_OBJECT (sink, "Invalid flexible tensors");
+      GST_ERROR_OBJECT (sink,
+          "Invalid format of tensors, the format is static.");
       goto error;
     }
     tensor_size = info.size;
@@ -358,11 +359,10 @@ gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink,
   json_array_add_int_element (sink->sample_offset_array, sink->fd_offset);
   sink->fd_offset += total_write;
 
-  GST_LOG_OBJECT (sink, "flexible_tensor_count: %u",
-      sink->flexible_tensor_count);
+  GST_LOG_OBJECT (sink, "cumulative_tensors: %u", sink->cumulative_tensors);
   json_array_add_int_element (sink->tensor_count_array,
-      sink->flexible_tensor_count);
-  sink->flexible_tensor_count += num_tensors;
+      sink->cumulative_tensors);
+  sink->cumulative_tensors += num_tensors;
 
   sink->total_samples++;
 
@@ -455,19 +455,21 @@ gst_data_repo_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
 {
   GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink);
 
-  sink->is_flexible_tensors =
-      gst_tensor_pad_caps_is_flexible (GST_BASE_SINK_PAD (sink));
-
   switch (sink->data_type) {
     case GST_DATA_REPO_DATA_VIDEO:
     case GST_DATA_REPO_DATA_AUDIO:
     case GST_DATA_REPO_DATA_TEXT:
     case GST_DATA_REPO_DATA_OCTET:
+      return gst_data_repo_sink_write_others (sink, buffer);
     case GST_DATA_REPO_DATA_TENSOR:
-      if (sink->is_flexible_tensors)
-        return gst_data_repo_sink_write_flexible_tensors (sink, buffer);
-      /* default write function for tensors(fixed), video, audio, text and octet */
+    {
+      sink->is_static_tensors =
+          gst_tensor_pad_caps_is_static (GST_BASE_SINK_PAD (sink));
+      if (!sink->is_static_tensors)
+        return gst_data_repo_sink_write_flexible_or_sparse_tensors (sink,
+            buffer);
       return gst_data_repo_sink_write_others (sink, buffer);
+    }
     case GST_DATA_REPO_DATA_IMAGE:
       return gst_data_repo_sink_write_multi_images (sink, buffer);
     default:
@@ -719,7 +721,7 @@ gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink)
   json_object_set_int_member (sink->json_object, "total_samples",
       sink->total_samples);
 
-  if (sink->is_flexible_tensors) {
+  if (sink->data_type == GST_DATA_REPO_DATA_TENSOR && !sink->is_static_tensors) {
     json_object_set_array_member (sink->json_object, "sample_offset",
         sink->sample_offset_array);
     json_object_set_array_member (sink->json_object, "tensor_size",
index 45c327a..45e5d61 100644 (file)
@@ -46,9 +46,9 @@ struct _GstDataRepoSink
   JsonArray *sample_offset_array;   /**< offset array of sample */
   JsonArray *tensor_size_array;    /**< size array of flexible tensor */
   JsonArray *tensor_count_array;  /**< array for the number of cumulative tensors */
-  guint flexible_tensor_count;
+  guint cumulative_tensors;    /**< the number of cumulated tensors */
 
-  gboolean is_flexible_tensors;
+  gboolean is_static_tensors;
   gint fd;                        /**< open file descriptor*/
   GstDataRepoDataType data_type;  /**< data type */
   gint total_samples;             /**< The number of total samples, in the case of multi-files, it is used as an index. */
index 9816d54..07e69e4 100644 (file)
@@ -253,7 +253,7 @@ gst_data_repo_src_init (GstDataRepoSrc * src)
   src->caps = NULL;
   src->sample_size = 0;
   src->need_changed_caps = FALSE;
-  src->is_flexible_tensors = FALSE;
+  src->is_static_tensors = FALSE;
 
   /* Filling the buffer should be pending until set_caps() */
   gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
@@ -429,7 +429,6 @@ gst_data_repo_src_shuffle_samples_index (GstDataRepoSrc * src)
   g_return_if_fail (src->shuffled_index_array != NULL);
 
   GST_LOG_OBJECT (src, "samples index are shuffled");
-
   /* Fisher-Yates algorithm */
   /* The last index is the number of samples - 1. */
   for (i = src->num_samples - 1; i > 0; i--) {
@@ -438,8 +437,10 @@ gst_data_repo_src_shuffle_samples_index (GstDataRepoSrc * src)
     value_j = g_array_index (src->shuffled_index_array, guint, j);
 
     /* shuffled_index_array->data type is gchar * */
-    *(src->shuffled_index_array->data + (sizeof (guint) * i)) = value_j;
-    *(src->shuffled_index_array->data + (sizeof (guint) * j)) = value_i;
+    *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * i)) =
+        value_j;
+    *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * j)) =
+        value_i;
   }
 
   for (i = 0; i < src->shuffled_index_array->len; i++) {
@@ -632,10 +633,10 @@ gst_data_repo_src_get_num_tensors (GstDataRepoSrc * src, guint shuffled_index)
 }
 
 /**
- * @brief Function to read tensors
+ * @brief Function to read flexible or sparse tensors
  */
 static GstFlowReturn
-gst_data_repo_src_read_flexible_tensors (GstDataRepoSrc * src,
+gst_data_repo_src_read_flexible_or_sparse_tensors (GstDataRepoSrc * src,
     GstBuffer ** buffer)
 {
   GstFlowReturn ret = GST_FLOW_OK;
@@ -1122,10 +1123,10 @@ gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
     case GST_DATA_REPO_DATA_OCTET:
       return gst_data_repo_src_read_others (src, buffer);
     case GST_DATA_REPO_DATA_TENSOR:
-      if (src->is_flexible_tensors)
-        return gst_data_repo_src_read_flexible_tensors (src, buffer);
-      else
+      if (src->is_static_tensors)
         return gst_data_repo_src_read_tensors (src, buffer);
+      else
+        return gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer);
     case GST_DATA_REPO_DATA_IMAGE:
       return gst_data_repo_src_read_multi_images (src, buffer);
     default:
@@ -1360,8 +1361,8 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
       s = gst_caps_get_structure (caps, 0);
       v = gst_structure_get_value (s, "format");
       format = g_value_get_string (v);
-      if (g_strcmp0 (format, "flexible") == 0)
-        src->is_flexible_tensors = TRUE;
+      if (g_strcmp0 (format, "static") == 0)
+        src->is_static_tensors = TRUE;
       break;
     default:
       break;
@@ -1449,7 +1450,7 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src)
     GST_INFO_OBJECT (src, "sample_size: %d", src->sample_size);
   }
 
-  if (src->is_flexible_tensors) {
+  if (src->data_type == GST_DATA_REPO_DATA_TENSOR && !src->is_static_tensors) {
     if (!json_object_has_member (object, "sample_offset")) {
       GST_ERROR_OBJECT (src, "There is not sample_offset field: %s", contents);
       goto error;
@@ -1645,7 +1646,8 @@ gst_data_repo_src_change_state (GstElement * element, GstStateChange transition)
       /** if data_type is not GST_DATA_REPO_DATA_UNKNOWN and sample_size is 0 then
           'caps' is set by property and sample size needs to be set by blocksize
           (in the case of otect and text) */
-      if (src->sample_size == 0 && !src->is_flexible_tensors) {
+      if (src->sample_size == 0 && (src->data_type == GST_DATA_REPO_DATA_OCTET
+              || src->data_type == GST_DATA_REPO_DATA_TEXT)) {
         basesrc = GST_BASE_SRC (src);
         g_object_get (G_OBJECT (basesrc), "blocksize", &blocksize, NULL);
         GST_DEBUG_OBJECT (src, "blocksize = %d", blocksize);
index 74077c7..1782b2e 100644 (file)
@@ -45,7 +45,7 @@ struct _GstDataRepoSrc {
   GstPushSrc parent;            /**< parent object */
   GstPad *src_pad;
 
-  gboolean is_flexible_tensors;
+  gboolean is_static_tensors;
   gboolean is_start;            /**< check if datareposrc is started */
   gboolean successful_read;     /**< used for checking EOS when reading more than one images(multi-files) from a path */
   gint fd;                      /**< open file descriptor */
index d0d4d90..bfc1024 100644 (file)
@@ -364,6 +364,76 @@ TEST (datareposink, writeFlexibleTensors)
   g_remove ("flexible.json");
 }
 
+
+/**
+ * @brief Test for writing sparse tensors
+ */
+TEST (datareposink, writeSparseTensors)
+{
+  GFile *file = NULL;
+  gchar *contents = NULL;
+  GstBus *bus;
+  GMainLoop *loop;
+  gchar *file_path = NULL;
+  gchar *json_path = NULL;
+  gboolean ret;
+  gint64 size, org_size = 31760;
+  GFileInfo *info = NULL;
+
+  loop = g_main_loop_new (NULL, FALSE);
+
+  file_path = get_file_path (filename);
+  json_path = get_file_path (json);
+
+  gchar *str_pipeline = g_strdup_printf (
+      "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
+      "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! "
+      "datareposink location=sparse.data json=sparse.json",
+      file_path, json_path);
+
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  g_free (str_pipeline);
+  ASSERT_NE (pipeline, nullptr);
+
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  /* Confirm file creation */
+  file = g_file_new_for_path ("sparse.json");
+  ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+  g_free (contents);
+  g_object_unref (file);
+  g_free (file_path);
+  g_free (json_path);
+  ASSERT_EQ (ret, TRUE);
+
+  /* Confirm file creation */
+  file = g_file_new_for_path ("sparse.data");
+  ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+  info = g_file_query_info (
+      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+  size = g_file_info_get_size (info);
+  g_object_unref (file);
+  g_object_unref (info);
+  g_free (contents);
+  ASSERT_EQ (ret, TRUE);
+
+  /* The size of one mnist sample is 3176 bytes, the number of samples for test is 10. */
+  EXPECT_LT (size, org_size);
+
+  g_remove ("sparse.data");
+  g_remove ("sparse.json");
+}
+
 /**
  * @brief Test for writing a file with invalid param (JSON path)
  */
@@ -510,6 +580,65 @@ TEST (datareposink, writeFlexibleTensors_n)
 }
 
 /**
+ * @brief Test for writing flexible tensors
+ */
+TEST (datareposink, writeSparseTensors_n)
+{
+  GFile *file = NULL;
+  GstBus *bus;
+  GMainLoop *loop;
+  GstElement *pipeline;
+  GFileInfo *file_info = NULL;
+  gint64 size = 0;
+  int i;
+  gchar *filename = NULL;
+
+  create_image_test_file ();
+
+  /* Insert non-Flexible Tensor data after negotiating with flexible caps. */
+  const gchar *str_pipeline
+      = "multifilesrc location=img_%02d.png ! other/tensors,format=sparse,framerate=0/1 ! "
+        "datareposink location=sparse.data json=sparse.json";
+
+  pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  /* Confirm file creation */
+  file = g_file_new_for_path ("sparse.data");
+  ASSERT_NE (file, nullptr);
+  file_info = g_file_query_info (
+      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+  ASSERT_NE (file_info, nullptr);
+  size = g_file_info_get_size (file_info);
+  ASSERT_EQ (size, 0);
+  g_object_unref (file_info);
+  g_object_unref (file);
+
+  for (i = 0; i < 5; i++) {
+    filename = g_strdup_printf ("img_%02d.png", i);
+    g_remove (filename);
+    g_free (filename);
+  }
+
+  g_remove ("img.json");
+  g_remove ("sparse.json");
+  g_remove ("sparse.data");
+}
+
+/**
  * @brief Main GTest
  */
 int
index ea59fbd..8258da2 100644 (file)
@@ -67,6 +67,40 @@ new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data)
 }
 
 /**
+ * @brief create sparse tensors file
+ */
+static void
+create_sparse_tensors_test_file ()
+{
+  GstBus *bus;
+  GMainLoop *loop;
+  gchar *file_path = get_file_path (filename);
+  gchar *json_path = get_file_path (json);
+
+  const gchar *str_pipeline = g_strdup_printf (
+      "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
+      "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! "
+      "datareposink location=sparse.data json=sparse.json",
+      file_path, json_path);
+
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+}
+
+/**
  * @brief create flexible tensors file
  */
 static void
@@ -556,6 +590,74 @@ TEST (datareposrc, readFlexibleTensors)
 }
 
 /**
+ * @brief Test for reading a file composed of sparse tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readSparseTensors)
+{
+  GFile *file = NULL;
+  GFileInfo *info = NULL;
+  gint64 size, org_size = 31760;
+  gint buffer_count = 0;
+  GstElement *tensor_sink;
+  GstBus *bus;
+  const gchar *str_pipeline = NULL;
+  GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  str_pipeline
+      = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! "
+        "other/tensors, format=static, num_tensors=2, framerate=0/1, "
+        "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t "
+        "t. ! queue ! filesink location=sample.data t. ! queue ! tensor_sink name=tensor_sink0";
+
+  create_sparse_tensors_test_file ();
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  EXPECT_NE (buffer_count, 0);
+  handler = NULL;
+
+  file = g_file_new_for_path ("sparse.data");
+  info = g_file_query_info (
+      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+  size = g_file_info_get_size (info);
+  g_object_unref (file);
+  g_object_unref (info);
+  EXPECT_LT (size, org_size);
+
+  file = g_file_new_for_path ("sample.data");
+  info = g_file_query_info (
+      file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+  size = g_file_info_get_size (info);
+  g_object_unref (file);
+  g_object_unref (info);
+  EXPECT_EQ (size, org_size);
+
+  gst_object_unref (tensor_sink);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  g_remove ("sparse.json");
+  g_remove ("sparse.data");
+  g_remove ("sample.data");
+}
+
+/**
  * @brief Test for reading a tensors file with Caps property
  */
 TEST (datareposrc, readTensorsNoJSONWithCapsParam)
@@ -918,6 +1020,57 @@ TEST (datareposrc, readInvalidFlexibleTensors)
 }
 
 /**
+ * @brief Test for reading a file composed of non-sparse tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readInvalidSparseTensors)
+{
+  gint buffer_count = 0;
+  GstBus *bus;
+  GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  const gchar *str_pipeline
+      = "datareposrc location=audio1.raw json=sparse.json ! tensor_sink name=tensor_sink0";
+  GstElement *tensor_sink;
+
+  create_sparse_tensors_test_file ();
+  create_audio_test_file ();
+
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  /* EXPECT_EQ not checked due to internal data stream error */
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+  /* Internal data stream error */
+  EXPECT_EQ (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  g_remove ("audio1.json");
+  g_remove ("audio1.raw");
+  g_remove ("sparse.json");
+  g_remove ("sparse.data");
+}
+
+/**
  * @brief Main GTest
  */
 int