From: hyunil park Date: Tue, 20 Jun 2023 10:19:48 +0000 (+0900) Subject: [datareposink] Add function to write flexible tensors X-Git-Tag: accepted/tizen/unified/20230710.013136~17 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=50dcabd7d68781a40904e0e1c540c47e963990d2;p=platform%2Fupstream%2Fnnstreamer.git [datareposink] Add function to write flexible tensors - To add memory to gstbuffer by number of flexible tensors when read sample in datareposrc, sample_offset, tensor_size, and tensor_count field are added to the JSON file - Add unit test - Reference * If caps of sink pad is flexible, the input gstBuffer is saved as a flexible tensor * The size of the input gstBuffer is stored as sample_offset field in JSON file and it will be used when shuffle operation in datareposrc * Each flexible tensor size is stored as tensor_size field in JSON file and it will be used with gst_buffer_append_memory() in datareposrc Signed-off-by: hyunil park --- diff --git a/gst/datarepo/gstdatareposink.c b/gst/datarepo/gstdatareposink.c index 98a34cd..2a0c28e 100644 --- a/gst/datarepo/gstdatareposink.c +++ b/gst/datarepo/gstdatareposink.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include "gstdatareposink.h" @@ -175,15 +176,17 @@ gst_data_repo_sink_init (GstDataRepoSink * sink) { sink->filename = NULL; sink->fd = 0; - sink->offset = 0; + sink->fd_offset = 0; sink->data_type = GST_DATA_REPO_DATA_UNKNOWN; sink->is_flexible_tensors = FALSE; sink->fixed_caps = NULL; sink->json_object = NULL; sink->total_samples = 0; - /* init here for flexible tensor */ + sink->flexible_tensor_count = 0; sink->json_object = json_object_new (); - sink->json_offset_array = json_array_new (); + sink->sample_offset_array = json_array_new (); + sink->tensor_size_array = json_array_new (); + sink->tensor_count_array = json_array_new (); } /** @@ -197,6 +200,11 @@ gst_data_repo_sink_finalize (GObject * object) g_free (sink->filename); g_free (sink->json_filename); + if (sink->fd) { + g_close (sink->fd, NULL); + sink->fd = 0; + } + /* Check for gst-inspect log */ if (sink->fixed_caps) gst_caps_unref (sink->fixed_caps); @@ -271,7 +279,7 @@ gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer) GST_LOG_OBJECT (sink, "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)", - (long long) info.size, sink->offset, (long long) sink->offset); + (long long) info.size, sink->fd_offset, (long long) sink->fd_offset); write_size = write (sink->fd, info.data, info.size); @@ -283,7 +291,7 @@ gst_data_repo_sink_write_others (GstDataRepoSink * sink, GstBuffer * buffer) return GST_FLOW_ERROR; } - sink->offset += write_size; + sink->fd_offset += write_size; sink->total_samples++; return GST_FLOW_OK; @@ -296,29 +304,79 @@ static GstFlowReturn gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink, GstBuffer * buffer) { + guint num_tensors, i; + gsize write_size = 0, total_write = 0, tensor_size; GstMapInfo info; - gsize to_write = 0; + GstMemory *mem; + GstTensorMetaInfo meta; g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR); g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR); g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR); g_return_val_if_fail (sink->json_object != NULL, GST_FLOW_ERROR); - g_return_val_if_fail (sink->json_offset_array != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (sink->sample_offset_array != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (sink->tensor_size_array != NULL, GST_FLOW_ERROR); + g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR); GST_OBJECT_LOCK (sink); - gst_buffer_map (buffer, &info, GST_MAP_READ); - to_write = info.size; + num_tensors = gst_buffer_n_tensor (buffer); + GST_INFO_OBJECT (sink, "num_tensors: %u", num_tensors); - gst_buffer_unmap (buffer, &info); - GST_OBJECT_UNLOCK (sink); + for (i = 0; i < num_tensors; i++) { + mem = gst_tensor_buffer_get_nth_memory (buffer, i); + if (!gst_memory_map (mem, &info, GST_MAP_READ)) { + GST_ERROR_OBJECT (sink, "Failed to map memory"); + goto mem_map_error; + } + + if (!gst_tensor_meta_info_parse_header (&meta, info.data)) { + GST_ERROR_OBJECT (sink, "Invalid flexible tensors"); + goto error; + } + tensor_size = info.size; + + GST_LOG_OBJECT (sink, "tensor[%u] size: %zd", i, tensor_size); + GST_LOG_OBJECT (sink, + "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)", + (long long) tensor_size, sink->fd_offset + total_write, + (long long) sink->fd_offset + total_write); + + write_size = write (sink->fd, info.data, tensor_size); + if (write_size != tensor_size) { + GST_ERROR_OBJECT (sink, "Could not write data to file"); + goto error; + } + + json_array_add_int_element (sink->tensor_size_array, tensor_size); + total_write += write_size; + + gst_memory_unmap (mem, &info); + gst_memory_unref (mem); + } + + json_array_add_int_element (sink->sample_offset_array, sink->fd_offset); + sink->fd_offset += total_write; - json_array_add_int_element (sink->json_offset_array, sink->offset); + GST_LOG_OBJECT (sink, "flexible_tensor_count: %u", + sink->flexible_tensor_count); + json_array_add_int_element (sink->tensor_count_array, + sink->flexible_tensor_count); + sink->flexible_tensor_count += num_tensors; sink->total_samples++; - sink->offset += to_write; + + GST_OBJECT_UNLOCK (sink); return GST_FLOW_OK; + +error: + gst_memory_unmap (mem, &info); + gst_memory_unref (mem); +mem_map_error: + GST_OBJECT_UNLOCK (sink); + + return GST_FLOW_ERROR; } /** @@ -397,6 +455,9 @@ gst_data_repo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) { GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink); + sink->is_flexible_tensors = + gst_tensor_pad_caps_is_flexible (GST_BASE_SINK_PAD (sink)); + switch (sink->data_type) { case GST_DATA_REPO_DATA_VIDEO: case GST_DATA_REPO_DATA_AUDIO: @@ -686,7 +747,9 @@ gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink) g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE); g_return_val_if_fail (sink->fixed_caps != NULL, FALSE); g_return_val_if_fail (sink->json_object != NULL, FALSE); - g_return_val_if_fail (sink->json_offset_array != NULL, FALSE); + g_return_val_if_fail (sink->sample_offset_array != NULL, FALSE); + g_return_val_if_fail (sink->tensor_size_array != NULL, FALSE); + g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR); caps_str = gst_caps_to_string (sink->fixed_caps); GST_DEBUG_OBJECT (sink, "caps string: %s", caps_str); @@ -696,21 +759,24 @@ gst_data_repo_sink_write_json_meta_file (GstDataRepoSink * sink) json_object_set_int_member (sink->json_object, "total_samples", sink->total_samples); - if (sink->is_flexible_tensors) - json_object_set_array_member (sink->json_object, "offset", - sink->json_offset_array); - else + if (sink->is_flexible_tensors) { + json_object_set_array_member (sink->json_object, "sample_offset", + sink->sample_offset_array); + json_object_set_array_member (sink->json_object, "tensor_size", + sink->tensor_size_array); + json_object_set_array_member (sink->json_object, "tensor_count", + sink->tensor_count_array); + } else { json_object_set_int_member (sink->json_object, "sample_size", sink->sample_size); - + } ret = __write_json (sink->json_object, sink->json_filename); if (!ret) { - GST_ERROR_OBJECT (sink, "Faild to write json meta file: %s", + GST_ERROR_OBJECT (sink, "Failed to write json meta file: %s", sink->json_filename); } json_object_unref (sink->json_object); - json_array_unref (sink->json_offset_array); g_free (caps_str); return ret; diff --git a/gst/datarepo/gstdatareposink.h b/gst/datarepo/gstdatareposink.h index d0a2021..45c327a 100644 --- a/gst/datarepo/gstdatareposink.h +++ b/gst/datarepo/gstdatareposink.h @@ -43,14 +43,17 @@ struct _GstDataRepoSink GstCaps *fixed_caps; /**< to get meta info */ JsonObject *json_object; /**< JSON object */ - JsonArray *json_offset_array; /**< offset array for flexible tensors */ + JsonArray *sample_offset_array; /**< offset array of sample */ + JsonArray *tensor_size_array; /**< size array of flexible tensor */ + JsonArray *tensor_count_array; /**< array for the number of cumulative tensors */ + guint flexible_tensor_count; gboolean is_flexible_tensors; gint fd; /**< open file descriptor*/ GstDataRepoDataType data_type; /**< data type */ gint total_samples; /**< The number of total samples, in the case of multi-files, it is used as an index. */ - guint64 offset; /**< offset of fd */ - guint sample_size; /**< size of one sample */ + guint64 fd_offset; /**< offset of fd */ + guint sample_size; /**< size of one sample */ /* property */ gchar *filename; /**< filename */ diff --git a/tests/nnstreamer_datarepo/unittest_datareposink.cc b/tests/nnstreamer_datarepo/unittest_datareposink.cc index 42000ba..d0d4d90 100644 --- a/tests/nnstreamer_datarepo/unittest_datareposink.cc +++ b/tests/nnstreamer_datarepo/unittest_datareposink.cc @@ -37,7 +37,6 @@ get_file_path (const gchar *filename) return file_path; } - /** * @brief Bus callback function */ @@ -46,6 +45,7 @@ bus_callback (GstBus *bus, GstMessage *message, gpointer data) { switch (GST_MESSAGE_TYPE (message)) { case GST_MESSAGE_EOS: + case GST_MESSAGE_ERROR: g_main_loop_quit ((GMainLoop *) data); break; default: @@ -56,6 +56,37 @@ bus_callback (GstBus *bus, GstMessage *message, gpointer data) } /** + * @brief create image test file + */ +static void +create_image_test_file () +{ + GstBus *bus; + GMainLoop *loop; + + loop = g_main_loop_new (NULL, FALSE); + + gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! " + "datareposink location=img_%02d.png json=img.json"); + + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + g_free (str_pipeline); + ASSERT_NE (pipeline, nullptr); + + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); +} + +/** * @brief Test for writing image files */ TEST (datareposink, writeImageFiles) @@ -67,15 +98,13 @@ TEST (datareposink, writeImageFiles) GMainLoop *loop; gint i = 0; gboolean ret; - - loop = g_main_loop_new (NULL, FALSE); - - gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! " - "datareposink location=image_%02d.png json=image.json"); + const gchar *str_pipeline + = "videotestsrc num-buffers=5 ! pngenc ! datareposink location=image_%02d.png json=image.json"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); + ASSERT_NE (pipeline, nullptr); + loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); ASSERT_NE (bus, nullptr); gst_bus_add_watch (bus, bus_callback, loop); @@ -106,6 +135,13 @@ TEST (datareposink, writeImageFiles) g_object_unref (file); g_free (contents); ASSERT_EQ (ret, TRUE); + + g_remove ("image.json"); + for (i = 0; i < 5; i++) { + filename = g_strdup_printf ("image_%02d.png", i); + g_remove (filename); + g_free (filename); + } } /** @@ -118,18 +154,15 @@ TEST (datareposink, writeAudioRaw) GstBus *bus; GMainLoop *loop; gboolean ret; - - loop = g_main_loop_new (NULL, FALSE); - - gchar *str_pipeline = g_strdup ( - "gst-launch-1.0 audiotestsrc samplesperbuffer=44100 num-buffers=1 ! " - "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! " - "datareposink location=audio.raw json=audio.json"); + const gchar *str_pipeline + = "audiotestsrc samplesperbuffer=44100 num-buffers=1 ! " + "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! " + "datareposink location=audio.raw json=audio.json"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); + loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); ASSERT_NE (bus, nullptr); gst_bus_add_watch (bus, bus_callback, loop); @@ -155,6 +188,9 @@ TEST (datareposink, writeAudioRaw) g_object_unref (file); g_free (contents); ASSERT_EQ (ret, TRUE); + + g_remove ("audio.json"); + g_remove ("audio.raw"); } /** @@ -167,16 +203,13 @@ TEST (datareposink, writeVideoRaw) GstBus *bus; GMainLoop *loop; gboolean ret; - - loop = g_main_loop_new (NULL, FALSE); - - gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! " - "datareposink location=video.raw json=video.json"); + const gchar *str_pipeline + = "videotestsrc num-buffers=10 ! datareposink location=video.raw json=video.json"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); + loop = g_main_loop_new (NULL, FALSE); bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); ASSERT_NE (bus, nullptr); gst_bus_add_watch (bus, bus_callback, loop); @@ -202,6 +235,9 @@ TEST (datareposink, writeVideoRaw) g_object_unref (file); g_free (contents); ASSERT_EQ (ret, TRUE); + + g_remove ("video.raw"); + g_remove ("video.json"); } /** @@ -225,8 +261,7 @@ TEST (datareposink, writeTensors) json_path = get_file_path (json); gchar *str_pipeline = g_strdup_printf ( - "gst-launch-1.0 datareposrc location=%s json=%s " - "start-sample-index=0 stop-sample-index=9 ! " + "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! " "datareposink name=datareposink location=mnist.data json=mnist.json", file_path, json_path); @@ -270,6 +305,63 @@ TEST (datareposink, writeTensors) g_free (file_path); g_free (json_path); ASSERT_EQ (ret, TRUE); + + g_remove ("mnist.data"); + g_remove ("mnist.json"); +} + +/** + * @brief Test for writing flexible tensors + */ +TEST (datareposink, writeFlexibleTensors) +{ + GFile *file = NULL; + gchar *contents = NULL; + GstBus *bus; + GMainLoop *loop; + gboolean ret; + const gchar *str_pipeline + = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 " + "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 " + "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! " + "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 " + "join name=join0 ! other/tensors,format=flexible ! " + "datareposink location=flexible.data json=flexible.json"; + + GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + /* Confirm file creation */ + file = g_file_new_for_path ("flexible.data"); + ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL); + g_object_unref (file); + g_free (contents); + ASSERT_EQ (ret, TRUE); + + /* Confirm file creation */ + file = g_file_new_for_path ("flexible.json"); + ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL); + g_object_unref (file); + g_free (contents); + ASSERT_EQ (ret, TRUE); + + g_remove ("flexible.data"); + g_remove ("flexible.json"); } /** @@ -279,11 +371,11 @@ TEST (datareposink, invalidJsonPath0_n) { GstElement *datareposink = NULL; - gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! pngenc ! " - "datareposink name=datareposink"); + const gchar *str_pipeline + = "videotestsrc num-buffers=10 ! pngenc ! datareposink name=datareposink"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); + ASSERT_NE (pipeline, nullptr); datareposink = gst_bin_get_by_name (GST_BIN (pipeline), "datareposink"); EXPECT_NE (datareposink, nullptr); @@ -305,11 +397,10 @@ TEST (datareposink, invalidFilePath0_n) { GstElement *datareposink = NULL; - gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! pngenc ! " - "datareposink name=datareposink"); + const gchar *str_pipeline + = "videotestsrc num-buffers=10 ! pngenc ! datareposink name=datareposink"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); datareposink = gst_bin_get_by_name (GST_BIN (pipeline), "datareposink"); @@ -330,11 +421,10 @@ TEST (datareposink, invalidFilePath0_n) */ TEST (datareposink, unsupportedVideoCaps0_n) { - gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc ! vp8enc ! " - "datareposink location=video.raw json=video.json"); + const gchar *str_pipeline + = "videotestsrc ! vp8enc ! datareposink location=video.raw json=video.json"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); /* Could not to to GST_STATE_PLAYING state due to caps negotiation failure */ @@ -348,11 +438,10 @@ TEST (datareposink, unsupportedVideoCaps0_n) */ TEST (datareposink, unsupportedAudioCaps0_n) { - gchar *str_pipeline = g_strdup ("gst-launch-1.0 audiotestsrc ! audio/x-raw,rate=44100,channels=2 ! wavenc ! " - "datareposink location=audio.raw json=audio.json"); + const gchar *str_pipeline = "audiotestsrc ! audio/x-raw,rate=44100,channels=2 ! " + "wavenc ! datareposink location=audio.raw json=audio.json"; GstElement *pipeline = gst_parse_launch (str_pipeline, NULL); - g_free (str_pipeline); ASSERT_NE (pipeline, nullptr); /* Could not to to GST_STATE_PLAYING state due to caps negotiation failure */ @@ -362,26 +451,62 @@ TEST (datareposink, unsupportedAudioCaps0_n) } /** - * @brief remove test file + * @brief Test for writing flexible tensors */ -static void -remove_test_file (void) +TEST (datareposink, writeFlexibleTensors_n) { - gchar *filename = NULL; + GFile *file = NULL; + GstBus *bus; + GMainLoop *loop; + GstElement *pipeline; + GFileInfo *file_info = NULL; + gint64 size = 0; int i; + gchar *filename = NULL; - g_remove ("audio.json"); - g_remove ("audio.raw"); - g_remove ("video.json"); - g_remove ("video.raw"); - g_remove ("mnist.json"); - g_remove ("mnist.data"); + create_image_test_file (); + + /* Insert non-Flexible Tensor data after negotiating with flexible caps. */ + const gchar *str_pipeline + = "multifilesrc location=img_%02d.png caps=other/tensors,format=flexible ! " + "datareposink location=flexible.data json=flexible.json"; + + pipeline = gst_parse_launch (str_pipeline, NULL); + ASSERT_NE (pipeline, nullptr); + + loop = g_main_loop_new (NULL, FALSE); + bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); + ASSERT_NE (bus, nullptr); + gst_bus_add_watch (bus, bus_callback, loop); + gst_object_unref (bus); + + setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT); + g_main_loop_run (loop); + + setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT); + gst_object_unref (pipeline); + g_main_loop_unref (loop); + + /* Confirm file creation */ + file = g_file_new_for_path ("flexible.data"); + ASSERT_NE (file, nullptr); + file_info = g_file_query_info ( + file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); + ASSERT_NE (file_info, nullptr); + size = g_file_info_get_size (file_info); + ASSERT_EQ (size, 0); + g_object_unref (file_info); + g_object_unref (file); for (i = 0; i < 5; i++) { - filename = g_strdup_printf ("image_%02d.png", i); + filename = g_strdup_printf ("img_%02d.png", i); g_remove (filename); g_free (filename); } + + g_remove ("img.json"); + g_remove ("flexible.json"); + g_remove ("flexible.data"); } /** @@ -406,7 +531,5 @@ main (int argc, char **argv) g_warning ("catch `testing::internal::GoogleTestFailureException`"); } - remove_test_file (); - return result; }