From 9bb82c93982a1609d34bb544a38ad2fd04c65006 Mon Sep 17 00:00:00 2001 From: hyunil park Date: Mon, 10 Jul 2023 15:37:18 +0900 Subject: [PATCH] [datarepo] Add function to read and write sparse tensors - Add writing sparse tensors to datareposink To add memory to gstbuffer by number of sparse tensors when reading a sample in datareposrc, sample_offset, tensor_size, and tensor_count fields are added to the JSON file - Add reading sparse tensors to datareposrc If the tensor format of JSON file is sparse, read a sample using the JSON meta information (sample_offset, tensor_size, tensor_count) and append memory to GstBuffer as many as the number of sparse tensors. - Add unit test - bug fix about shuffle index Signed-off-by: hyunil park --- gst/datarepo/gstdatareposink.c | 36 ++--- gst/datarepo/gstdatareposink.h | 4 +- gst/datarepo/gstdatareposrc.c | 28 ++-- gst/datarepo/gstdatareposrc.h | 2 +- tests/nnstreamer_datarepo/unittest_datareposink.cc | 129 +++++++++++++++++ tests/nnstreamer_datarepo/unittest_datareposrc.cc | 153 +++++++++++++++++++++ 6 files changed, 319 insertions(+), 33 deletions(-) diff --git a/gst/datarepo/gstdatareposink.c b/gst/datarepo/gstdatareposink.c index f709c28..f1c4ba1 100644 --- a/gst/datarepo/gstdatareposink.c +++ b/gst/datarepo/gstdatareposink.c @@ -39,7 +39,7 @@ /** * @brief Tensors caps */ -#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible }") +#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }") /** * @brief Video caps */ @@ -178,11 +178,11 @@ gst_data_repo_sink_init (GstDataRepoSink * sink) sink->fd = 0; sink->fd_offset = 0; sink->data_type = GST_DATA_REPO_DATA_UNKNOWN; - sink->is_flexible_tensors = FALSE; + sink->is_static_tensors = FALSE; sink->fixed_caps = NULL; sink->json_object = NULL; sink->total_samples = 0; - sink->flexible_tensor_count = 0; + sink->cumulative_tensors = 0; sink->json_object = json_object_new (); sink->sample_offset_array = json_array_new (); sink->tensor_size_array = json_array_new (); @@ -298,10 +298,10 @@ gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer) } /** - * @brief Function to write flexible tensors + * @brief Function to write flexible tensors or sparse tensors */ static GstFlowReturn -gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink, +gst_data_repo_sink_write_flexible_or_sparse_tensors (GstDataRepoSink * sink, GstBuffer * buffer) { guint num_tensors, i; @@ -331,7 +331,8 @@ gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink, } if (!gst_tensor_meta_info_parse_header (&meta, info.data)) { - GST_ERROR_OBJECT (sink, "Invalid flexible tensors"); + GST_ERROR_OBJECT (sink, + "Invalid format of tensors, the format is static."); goto error; } tensor_size = info.size; @@ -358,11 +359,10 @@ gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink, json_array_add_int_element (sink->sample_offset_array, sink->fd_offset); sink->fd_offset += total_write; - GST_LOG_OBJECT (sink, "flexible_tensor_count: %u", - sink->flexible_tensor_count); + GST_LOG_OBJECT (sink, "cumulative_tensors: %u", sink->cumulative_tensors); json_array_add_int_element (sink->tensor_count_array, - sink->flexible_tensor_count); - sink->flexible_tensor_count += num_tensors; + sink->cumulative_tensors); + sink->cumulative_tensors += num_tensors; sink->total_samples++; @@ -455,19 +455,21 @@ gst_data_repo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink); - sink->is_flexible_tensors = - gst_tensor_pad_caps_is_flexible (GST_BASE_SINK_PAD (sink)); - switch (sink->data_type) { case GST_DATA_REPO_DATA_VIDEO: case GST_DATA_REPO_DATA_AUDIO: case GST_DATA_REPO_DATA_TEXT: case GST_DATA_REPO_DATA_OCTET: + return gst_data_repo_sink_write_others (sink, buffer); case GST_DATA_REPO_DATA_TENSOR: - if (sink->is_flexible_tensors) - return gst_data_repo_sink_write_flexible_tensors (sink, buffer); - /* default write function for tensors(fixed), video, audio, text and octet */ + { + sink->is_static_tensors = + gst_tensor_pad_caps_is_static (GST_BASE_SINK_PAD (sink)); + if (!sink->is_static_tensors) + return gst_data_repo_sink_write_flexible_or_sparse_tensors (sink, + buffer); return gst_data_repo_sink_write_others (sink, buffer); + } case GST_DATA_REPO_DATA_IMAGE: return gst_data_repo_sink_write_multi_images (sink, buffer); default: @@ -719,7 +721,7 @@ gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink) json_object_set_int_member (sink->json_object, "total_samples", sink->total_samples); - if (sink->is_flexible_tensors) { + if (sink->data_type == GST_DATA_REPO_DATA_TENSOR && !sink->is_static_tensors) { json_object_set_array_member (sink->json_object, "sample_offset", sink->sample_offset_array); json_object_set_array_member (sink->json_object, "tensor_size", diff --git a/gst/datarepo/gstdatareposink.h b/gst/datarepo/gstdatareposink.h index 45c327a..45e5d61 100644 --- a/gst/datarepo/gstdatareposink.h +++ b/gst/datarepo/gstdatareposink.h @@ -46,9 +46,9 @@ struct _GstDataRepoSink JsonArray *sample_offset_array; /**< offset array of sample */ JsonArray *tensor_size_array; /**< size array of flexible tensor */ JsonArray *tensor_count_array; /**< array for the number of cumulative tensors */ - guint flexible_tensor_count; + guint cumulative_tensors; /**< the number of cumulated tensors */ - gboolean is_flexible_tensors; + gboolean is_static_tensors; gint fd; /**< open file descriptor*/ GstDataRepoDataType data_type; /**< data type */ gint total_samples; /**< The number of total samples, in the case of multi-files, it is used as an index. */ diff --git a/gst/datarepo/gstdatareposrc.c b/gst/datarepo/gstdatareposrc.c index 9816d54..07e69e4 100644 --- a/gst/datarepo/gstdatareposrc.c +++ b/gst/datarepo/gstdatareposrc.c @@ -253,7 +253,7 @@ gst_data_repo_src_init (GstDataRepoSrc * src) src->caps = NULL; src->sample_size = 0; src->need_changed_caps = FALSE; - src->is_flexible_tensors = FALSE; + src->is_static_tensors = FALSE; /* Filling the buffer should be pending until set_caps() */ gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); @@ -429,7 +429,6 @@ gst_data_repo_src_shuffle_samples_index (GstDataRepoSrc * src) g_return_if_fail (src->shuffled_index_array != NULL); GST_LOG_OBJECT (src, "samples index are shuffled"); - /* Fisher-Yates algorithm */ /* The last index is the number of samples - 1. */ for (i = src->num_samples - 1; i > 0; i--) { @@ -438,8 +437,10 @@ gst_data_repo_src_shuffle_samples_index (GstDataRepoSrc * src) value_j = g_array_index (src->shuffled_index_array, guint, j); /* shuffled_index_array->data type is gchar * */ - *(src->shuffled_index_array->data + (sizeof (guint) * i)) = value_j; - *(src->shuffled_index_array->data + (sizeof (guint) * j)) = value_i; + *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * i)) = + value_j; + *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * j)) = + value_i; } for (i = 0; i < src->shuffled_index_array->len; i++) { @@ -632,10 +633,10 @@ gst_data_repo_src_get_num_tensors (GstDataRepoSrc * src, guint shuffled_index) } /** - * @brief Function to read tensors + * @brief Function to read flexible or sparse tensors */ static GstFlowReturn -gst_data_repo_src_read_flexible_tensors (GstDataRepoSrc * src, +gst_data_repo_src_read_flexible_or_sparse_tensors (GstDataRepoSrc * src, GstBuffer ** buffer) { GstFlowReturn ret = GST_FLOW_OK; @@ -1122,10 +1123,10 @@ gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer) case GST_DATA_REPO_DATA_OCTET: return gst_data_repo_src_read_others (src, buffer); case GST_DATA_REPO_DATA_TENSOR: - if (src->is_flexible_tensors) - return gst_data_repo_src_read_flexible_tensors (src, buffer); - else + if (src->is_static_tensors) return gst_data_repo_src_read_tensors (src, buffer); + else + return gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer); case GST_DATA_REPO_DATA_IMAGE: return gst_data_repo_src_read_multi_images (src, buffer); default: @@ -1360,8 +1361,8 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps) s = gst_caps_get_structure (caps, 0); v = gst_structure_get_value (s, "format"); format = g_value_get_string (v); - if (g_strcmp0 (format, "flexible") == 0) - src->is_flexible_tensors = TRUE; + if (g_strcmp0 (format, "static") == 0) + src->is_static_tensors = TRUE; break; default: break; @@ -1449,7 +1450,7 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src) GST_INFO_OBJECT (src, "sample_size: %d", src->sample_size); } - if (src->is_flexible_tensors) { + if (src->data_type == GST_DATA_REPO_DATA_TENSOR && !src->is_static_tensors) { if (!json_object_has_member (object, "sample_offset")) { GST_ERROR_OBJECT (src, "There is not sample_offset field: %s", contents); goto error; @@ -1645,7 +1646,8 @@ gst_data_repo_src_change_state (GstElement * element, GstStateChange transition) /** if data_type is not GST_DATA_REPO_DATA_UNKNOWN and sample_size is 0 then 'caps' is set by property and sample size needs to be set by blocksize (in the case of otect and text) */ - if (src->sample_size == 0 && !src->is_flexible_tensors) { + if (src->sample_size == 0 && (src->data_type == GST_DATA_REPO_DATA_OCTET + || src->data_type == GST_DATA_REPO_DATA_TEXT)) { basesrc = GST_BASE_SRC (src); g_object_get (G_OBJECT (basesrc), "blocksize", &blocksize, NULL); GST_DEBUG_OBJECT (src, "blocksize = %d", blocksize); diff --git a/gst/datarepo/gstdatareposrc.h b/gst/datarepo/gstdatareposrc.h index 74077c7..1782b2e 100644 --- a/gst/datarepo/gstdatareposrc.h +++ b/gst/datarepo/gstdatareposrc.h @@ -45,7 +45,7 @@ struct _GstDataRepoSrc { GstPushSrc parent; /**< parent object */ GstPad *src_pad; - gboolean is_flexible_tensors; + gboolean is_static_tensors; gboolean is_start; /**< check if datareposrc is started */ gboolean successful_read; /**< used for checking EOS when reading more than one images(multi-files) from a path */ gint fd; /**< open file descriptor */ diff --git a/tests/nnstreamer_datarepo/unittest_datareposink.cc b/tests/nnstreamer_datarepo/unittest_datareposink.cc index d0d4d90..bfc1024 100644 --- a/tests/nnstreamer_datarepo/unittest_datareposink.cc +++ b/tests/nnstreamer_datarepo/unittest_datareposink.cc @@ -364,6 +364,76 @@ TEST (datareposink, writeFlexibleTensors) g_remove ("flexible.json"); } + +/** + * @brief Test for writing sparse tensors + */ +TEST (datareposink, writeSparseTensors) +{ + GFile *file = NULL; + gchar *contents = NULL; + GstBus *bus; + GMainLoop *loop; + gchar *file_path = NULL; + gchar *json_path = NULL; + gboolean ret; + gint64 size, org_size = 31760; + GFileInfo *info = NULL; + + loop = g_main_loop_new (NULL, FALSE); + + file_path = get_file_path (filename); + json_path = get_file_path (json); + + gchar *str_pipeline = g_strdup_printf ( + "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! " + "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! " + "datareposink location=sparse.data json=sparse.json", + file_path, json_path); + + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + g_free (str_pipeline); + ASSERT_NE (pipeline, nullptr); + + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + /* Confirm file creation */ + file = g_file_new_for_path ("sparse.json"); + ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL); + g_free (contents); + g_object_unref (file); + g_free (file_path); + g_free (json_path); + ASSERT_EQ (ret, TRUE); + + /* Confirm file creation */ + file = g_file_new_for_path ("sparse.data"); + ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL); + info = g_file_query_info ( + file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); + size = g_file_info_get_size (info); + g_object_unref (file); + g_object_unref (info); + g_free (contents); + ASSERT_EQ (ret, TRUE); + + /* The size of one mnist sample is 3176 bytes, the number of samples for test is 10. */ + EXPECT_LT (size, org_size); + + g_remove ("sparse.data"); + g_remove ("sparse.json"); +} + /** * @brief Test for writing a file with invalid param (JSON path) */ @@ -510,6 +580,65 @@ TEST (datareposink, writeFlexibleTensors_n) } /** + * @brief Test for writing flexible tensors + */ +TEST (datareposink, writeSparseTensors_n) +{ + GFile *file = NULL; + GstBus *bus; + GMainLoop *loop; + GstElement *pipeline; + GFileInfo *file_info = NULL; + gint64 size = 0; + int i; + gchar *filename = NULL; + + create_image_test_file (); + + /* Insert non-Flexible Tensor data after negotiating with flexible caps. */ + const gchar *str_pipeline + = "multifilesrc location=img_%02d.png ! other/tensors,format=sparse,framerate=0/1 ! " + "datareposink location=sparse.data json=sparse.json"; + + pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + /* Confirm file creation */ + file = g_file_new_for_path ("sparse.data"); + ASSERT_NE (file, nullptr); + file_info = g_file_query_info ( + file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); + ASSERT_NE (file_info, nullptr); + size = g_file_info_get_size (file_info); + ASSERT_EQ (size, 0); + g_object_unref (file_info); + g_object_unref (file); + + for (i = 0; i < 5; i++) { + filename = g_strdup_printf ("img_%02d.png", i); + g_remove (filename); + g_free (filename); + } + + g_remove ("img.json"); + g_remove ("sparse.json"); + g_remove ("sparse.data"); +} + +/** * @brief Main GTest */ int diff --git a/tests/nnstreamer_datarepo/unittest_datareposrc.cc b/tests/nnstreamer_datarepo/unittest_datareposrc.cc index ea59fbd..8258da2 100644 --- a/tests/nnstreamer_datarepo/unittest_datareposrc.cc +++ b/tests/nnstreamer_datarepo/unittest_datareposrc.cc @@ -67,6 +67,40 @@ new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data) } /** + * @brief create sparse tensors file + */ +static void +create_sparse_tensors_test_file () +{ + GstBus *bus; + GMainLoop *loop; + gchar *file_path = get_file_path (filename); + gchar *json_path = get_file_path (json); + + const gchar *str_pipeline = g_strdup_printf ( + "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! " + "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! " + "datareposink location=sparse.data json=sparse.json", + file_path, json_path); + + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); +} + +/** * @brief create flexible tensors file */ static void @@ -556,6 +590,74 @@ TEST (datareposrc, readFlexibleTensors) } /** + * @brief Test for reading a file composed of sparse tensors + * the default shuffle is TRUE. + */ +TEST (datareposrc, readSparseTensors) +{ + GFile *file = NULL; + GFileInfo *info = NULL; + gint64 size, org_size = 31760; + gint buffer_count = 0; + GstElement *tensor_sink; + GstBus *bus; + const gchar *str_pipeline = NULL; + GMainLoop *loop; + GCallback handler = G_CALLBACK (new_data_cb); + str_pipeline + = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! " + "other/tensors, format=static, num_tensors=2, framerate=0/1, " + "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t " + "t. ! queue ! filesink location=sample.data t. ! queue ! tensor_sink name=tensor_sink0"; + + create_sparse_tensors_test_file (); + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0"); + ASSERT_NE (tensor_sink, nullptr); + + g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + EXPECT_NE (buffer_count, 0); + handler = NULL; + + file = g_file_new_for_path ("sparse.data"); + info = g_file_query_info ( + file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); + size = g_file_info_get_size (info); + g_object_unref (file); + g_object_unref (info); + EXPECT_LT (size, org_size); + + file = g_file_new_for_path ("sample.data"); + info = g_file_query_info ( + file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); + size = g_file_info_get_size (info); + g_object_unref (file); + g_object_unref (info); + EXPECT_EQ (size, org_size); + + gst_object_unref (tensor_sink); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + g_remove ("sparse.json"); + g_remove ("sparse.data"); + g_remove ("sample.data"); +} + +/** * @brief Test for reading a tensors file with Caps property */ TEST (datareposrc, readTensorsNoJSONWithCapsParam) @@ -918,6 +1020,57 @@ TEST (datareposrc, readInvalidFlexibleTensors) } /** + * @brief Test for reading a file composed of non-sparse tensors + * the default shuffle is TRUE. + */ +TEST (datareposrc, readInvalidSparseTensors) +{ + gint buffer_count = 0; + GstBus *bus; + GMainLoop *loop; + GCallback handler = G_CALLBACK (new_data_cb); + const gchar *str_pipeline + = "datareposrc location=audio1.raw json=sparse.json ! tensor_sink name=tensor_sink0"; + GstElement *tensor_sink; + + create_sparse_tensors_test_file (); + create_audio_test_file (); + + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0"); + ASSERT_NE (tensor_sink, nullptr); + + g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + /* EXPECT_EQ not checked due to internal data stream error */ + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + + /* Internal data stream error */ + EXPECT_EQ (buffer_count, 0); + handler = NULL; + + gst_object_unref (tensor_sink); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + g_remove ("audio1.json"); + g_remove ("audio1.raw"); + g_remove ("sparse.json"); + g_remove ("sparse.data"); +} + +/** * @brief Main GTest */ int -- 2.7.4