#include <fcntl.h>
#include <unistd.h>
#include <nnstreamer_plugin_api.h>
+#include <tensor_common.h>
#include <nnstreamer_util.h>
#include "gstdatareposink.h"
{
sink->filename = NULL;
sink->fd = 0;
- sink->offset = 0;
+ sink->fd_offset = 0;
sink->data_type = GST_DATA_REPO_DATA_UNKNOWN;
sink->is_flexible_tensors = FALSE;
sink->fixed_caps = NULL;
sink->json_object = NULL;
sink->total_samples = 0;
- /* init here for flexible tensor */
+ sink->flexible_tensor_count = 0;
sink->json_object = json_object_new ();
- sink->json_offset_array = json_array_new ();
+ sink->sample_offset_array = json_array_new ();
+ sink->tensor_size_array = json_array_new ();
+ sink->tensor_count_array = json_array_new ();
}
/**
g_free (sink->filename);
g_free (sink->json_filename);
+ if (sink->fd) {
+ g_close (sink->fd, NULL);
+ sink->fd = 0;
+ }
+
/* Check for gst-inspect log */
if (sink->fixed_caps)
gst_caps_unref (sink->fixed_caps);
GST_LOG_OBJECT (sink,
"Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
- (long long) info.size, sink->offset, (long long) sink->offset);
+ (long long) info.size, sink->fd_offset, (long long) sink->fd_offset);
write_size = write (sink->fd, info.data, info.size);
return GST_FLOW_ERROR;
}
- sink->offset += write_size;
+ sink->fd_offset += write_size;
sink->total_samples++;
return GST_FLOW_OK;
gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink,
GstBuffer * buffer)
{
+ guint num_tensors, i;
+ gsize write_size = 0, total_write = 0, tensor_size;
GstMapInfo info;
- gsize to_write = 0;
+ GstMemory *mem;
+ GstTensorMetaInfo meta;
g_return_val_if_fail (sink != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (sink->fd != 0, GST_FLOW_ERROR);
g_return_val_if_fail (sink->json_object != NULL, GST_FLOW_ERROR);
- g_return_val_if_fail (sink->json_offset_array != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (sink->sample_offset_array != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (sink->tensor_size_array != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
GST_OBJECT_LOCK (sink);
- gst_buffer_map (buffer, &info, GST_MAP_READ);
- to_write = info.size;
+ num_tensors = gst_buffer_n_tensor (buffer);
+ GST_INFO_OBJECT (sink, "num_tensors: %u", num_tensors);
- gst_buffer_unmap (buffer, &info);
- GST_OBJECT_UNLOCK (sink);
+ for (i = 0; i < num_tensors; i++) {
+ mem = gst_tensor_buffer_get_nth_memory (buffer, i);
+ if (!gst_memory_map (mem, &info, GST_MAP_READ)) {
+ GST_ERROR_OBJECT (sink, "Failed to map memory");
+ goto mem_map_error;
+ }
+
+ if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
+ GST_ERROR_OBJECT (sink, "Invalid flexible tensors");
+ goto error;
+ }
+ tensor_size = info.size;
+
+ GST_LOG_OBJECT (sink, "tensor[%u] size: %zd", i, tensor_size);
+ GST_LOG_OBJECT (sink,
+ "Writing %lld bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
+ (long long) tensor_size, sink->fd_offset + total_write,
+ (long long) sink->fd_offset + total_write);
+
+ write_size = write (sink->fd, info.data, tensor_size);
+ if (write_size != tensor_size) {
+ GST_ERROR_OBJECT (sink, "Could not write data to file");
+ goto error;
+ }
+
+ json_array_add_int_element (sink->tensor_size_array, tensor_size);
+ total_write += write_size;
+
+ gst_memory_unmap (mem, &info);
+ gst_memory_unref (mem);
+ }
+
+ json_array_add_int_element (sink->sample_offset_array, sink->fd_offset);
+ sink->fd_offset += total_write;
- json_array_add_int_element (sink->json_offset_array, sink->offset);
+ GST_LOG_OBJECT (sink, "flexible_tensor_count: %u",
+ sink->flexible_tensor_count);
+ json_array_add_int_element (sink->tensor_count_array,
+ sink->flexible_tensor_count);
+ sink->flexible_tensor_count += num_tensors;
sink->total_samples++;
- sink->offset += to_write;
+
+ GST_OBJECT_UNLOCK (sink);
return GST_FLOW_OK;
+
+error:
+ gst_memory_unmap (mem, &info);
+ gst_memory_unref (mem);
+mem_map_error:
+ GST_OBJECT_UNLOCK (sink);
+
+ return GST_FLOW_ERROR;
}
/**
{
GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink);
+ sink->is_flexible_tensors =
+ gst_tensor_pad_caps_is_flexible (GST_BASE_SINK_PAD (sink));
+
switch (sink->data_type) {
case GST_DATA_REPO_DATA_VIDEO:
case GST_DATA_REPO_DATA_AUDIO:
g_return_val_if_fail (sink->data_type != GST_DATA_REPO_DATA_UNKNOWN, FALSE);
g_return_val_if_fail (sink->fixed_caps != NULL, FALSE);
g_return_val_if_fail (sink->json_object != NULL, FALSE);
- g_return_val_if_fail (sink->json_offset_array != NULL, FALSE);
+ g_return_val_if_fail (sink->sample_offset_array != NULL, FALSE);
+ g_return_val_if_fail (sink->tensor_size_array != NULL, FALSE);
+ g_return_val_if_fail (sink->tensor_count_array != NULL, GST_FLOW_ERROR);
caps_str = gst_caps_to_string (sink->fixed_caps);
GST_DEBUG_OBJECT (sink, "caps string: %s", caps_str);
json_object_set_int_member (sink->json_object, "total_samples",
sink->total_samples);
- if (sink->is_flexible_tensors)
- json_object_set_array_member (sink->json_object, "offset",
- sink->json_offset_array);
- else
+ if (sink->is_flexible_tensors) {
+ json_object_set_array_member (sink->json_object, "sample_offset",
+ sink->sample_offset_array);
+ json_object_set_array_member (sink->json_object, "tensor_size",
+ sink->tensor_size_array);
+ json_object_set_array_member (sink->json_object, "tensor_count",
+ sink->tensor_count_array);
+ } else {
json_object_set_int_member (sink->json_object, "sample_size",
sink->sample_size);
-
+ }
ret = __write_json (sink->json_object, sink->json_filename);
if (!ret) {
- GST_ERROR_OBJECT (sink, "Faild to write json meta file: %s",
+ GST_ERROR_OBJECT (sink, "Failed to write json meta file: %s",
sink->json_filename);
}
json_object_unref (sink->json_object);
- json_array_unref (sink->json_offset_array);
g_free (caps_str);
return ret;
return file_path;
}
-
/**
* @brief Bus callback function
*/
{
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
+ case GST_MESSAGE_ERROR:
g_main_loop_quit ((GMainLoop *) data);
break;
default:
}
/**
+ * @brief create image test file
+ */
+static void
+create_image_test_file ()
+{
+ GstBus *bus;
+ GMainLoop *loop;
+
+ loop = g_main_loop_new (NULL, FALSE);
+
+ gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! "
+ "datareposink location=img_%02d.png json=img.json");
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ g_free (str_pipeline);
+ ASSERT_NE (pipeline, nullptr);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+}
+
+/**
* @brief Test for writing image files
*/
TEST (datareposink, writeImageFiles)
GMainLoop *loop;
gint i = 0;
gboolean ret;
-
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! "
- "datareposink location=image_%02d.png json=image.json");
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=5 ! pngenc ! datareposink location=image_%02d.png json=image.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
+ ASSERT_NE (pipeline, nullptr);
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
g_object_unref (file);
g_free (contents);
ASSERT_EQ (ret, TRUE);
+
+ g_remove ("image.json");
+ for (i = 0; i < 5; i++) {
+ filename = g_strdup_printf ("image_%02d.png", i);
+ g_remove (filename);
+ g_free (filename);
+ }
}
/**
GstBus *bus;
GMainLoop *loop;
gboolean ret;
-
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup (
- "gst-launch-1.0 audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
- "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
- "datareposink location=audio.raw json=audio.json");
+ const gchar *str_pipeline
+ = "audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
+ "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
+ "datareposink location=audio.raw json=audio.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
g_object_unref (file);
g_free (contents);
ASSERT_EQ (ret, TRUE);
+
+ g_remove ("audio.json");
+ g_remove ("audio.raw");
}
/**
GstBus *bus;
GMainLoop *loop;
gboolean ret;
-
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! "
- "datareposink location=video.raw json=video.json");
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=10 ! datareposink location=video.raw json=video.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
g_object_unref (file);
g_free (contents);
ASSERT_EQ (ret, TRUE);
+
+ g_remove ("video.raw");
+ g_remove ("video.json");
}
/**
json_path = get_file_path (json);
gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc location=%s json=%s "
- "start-sample-index=0 stop-sample-index=9 ! "
+ "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
"datareposink name=datareposink location=mnist.data json=mnist.json",
file_path, json_path);
g_free (file_path);
g_free (json_path);
ASSERT_EQ (ret, TRUE);
+
+ g_remove ("mnist.data");
+ g_remove ("mnist.json");
+}
+
+/**
+ * @brief Test for writing flexible tensors
+ */
+TEST (datareposink, writeFlexibleTensors)
+{
+ GFile *file = NULL;
+ gchar *contents = NULL;
+ GstBus *bus;
+ GMainLoop *loop;
+ gboolean ret;
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 "
+ "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 "
+ "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 "
+ "join name=join0 ! other/tensors,format=flexible ! "
+ "datareposink location=flexible.data json=flexible.json";
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("flexible.data");
+ ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+ g_object_unref (file);
+ g_free (contents);
+ ASSERT_EQ (ret, TRUE);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("flexible.json");
+ ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+ g_object_unref (file);
+ g_free (contents);
+ ASSERT_EQ (ret, TRUE);
+
+ g_remove ("flexible.data");
+ g_remove ("flexible.json");
}
/**
{
GstElement *datareposink = NULL;
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! pngenc ! "
- "datareposink name=datareposink");
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=10 ! pngenc ! datareposink name=datareposink";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
+ ASSERT_NE (pipeline, nullptr);
datareposink = gst_bin_get_by_name (GST_BIN (pipeline), "datareposink");
EXPECT_NE (datareposink, nullptr);
{
GstElement *datareposink = NULL;
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! pngenc ! "
- "datareposink name=datareposink");
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=10 ! pngenc ! datareposink name=datareposink";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposink = gst_bin_get_by_name (GST_BIN (pipeline), "datareposink");
*/
TEST (datareposink, unsupportedVideoCaps0_n)
{
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc ! vp8enc ! "
- "datareposink location=video.raw json=video.json");
+ const gchar *str_pipeline
+ = "videotestsrc ! vp8enc ! datareposink location=video.raw json=video.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
/* Could not to to GST_STATE_PLAYING state due to caps negotiation failure */
*/
TEST (datareposink, unsupportedAudioCaps0_n)
{
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 audiotestsrc ! audio/x-raw,rate=44100,channels=2 ! wavenc ! "
- "datareposink location=audio.raw json=audio.json");
+ const gchar *str_pipeline = "audiotestsrc ! audio/x-raw,rate=44100,channels=2 ! "
+ "wavenc ! datareposink location=audio.raw json=audio.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
/* Could not to to GST_STATE_PLAYING state due to caps negotiation failure */
}
/**
- * @brief remove test file
+ * @brief Test for writing flexible tensors
*/
-static void
-remove_test_file (void)
+TEST (datareposink, writeFlexibleTensors_n)
{
- gchar *filename = NULL;
+ GFile *file = NULL;
+ GstBus *bus;
+ GMainLoop *loop;
+ GstElement *pipeline;
+ GFileInfo *file_info = NULL;
+ gint64 size = 0;
int i;
+ gchar *filename = NULL;
- g_remove ("audio.json");
- g_remove ("audio.raw");
- g_remove ("video.json");
- g_remove ("video.raw");
- g_remove ("mnist.json");
- g_remove ("mnist.data");
+ create_image_test_file ();
+
+ /* Insert non-Flexible Tensor data after negotiating with flexible caps. */
+ const gchar *str_pipeline
+ = "multifilesrc location=img_%02d.png caps=other/tensors,format=flexible ! "
+ "datareposink location=flexible.data json=flexible.json";
+
+ pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("flexible.data");
+ ASSERT_NE (file, nullptr);
+ file_info = g_file_query_info (
+ file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+ ASSERT_NE (file_info, nullptr);
+ size = g_file_info_get_size (file_info);
+ ASSERT_EQ (size, 0);
+ g_object_unref (file_info);
+ g_object_unref (file);
for (i = 0; i < 5; i++) {
- filename = g_strdup_printf ("image_%02d.png", i);
+ filename = g_strdup_printf ("img_%02d.png", i);
g_remove (filename);
g_free (filename);
}
+
+ g_remove ("img.json");
+ g_remove ("flexible.json");
+ g_remove ("flexible.data");
}
/**
g_warning ("catch `testing::internal::GoogleTestFailureException`");
}
- remove_test_file ();
-
return result;
}