/**
* @brief Tensors caps
*/
-#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible }")
+#define TENSOR_CAPS GST_TENSORS_CAP_MAKE ("{ static, flexible, sparse }")
/**
* @brief Video caps
*/
sink->fd = 0;
sink->fd_offset = 0;
sink->data_type = GST_DATA_REPO_DATA_UNKNOWN;
- sink->is_flexible_tensors = FALSE;
+ sink->is_static_tensors = FALSE;
sink->fixed_caps = NULL;
sink->json_object = NULL;
sink->total_samples = 0;
- sink->flexible_tensor_count = 0;
+ sink->cumulative_tensors = 0;
sink->json_object = json_object_new ();
sink->sample_offset_array = json_array_new ();
sink->tensor_size_array = json_array_new ();
}
/**
- * @brief Function to write flexible tensors
+ * @brief Function to write flexible tensors or sparse tensors
*/
static GstFlowReturn
-gst_data_repo_sink_write_flexible_tensors (GstDataRepoSink * sink,
+gst_data_repo_sink_write_flexible_or_sparse_tensors (GstDataRepoSink * sink,
GstBuffer * buffer)
{
guint num_tensors, i;
}
if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
- GST_ERROR_OBJECT (sink, "Invalid flexible tensors");
+ GST_ERROR_OBJECT (sink,
+ "Invalid format of tensors, the format is static.");
goto error;
}
tensor_size = info.size;
json_array_add_int_element (sink->sample_offset_array, sink->fd_offset);
sink->fd_offset += total_write;
- GST_LOG_OBJECT (sink, "flexible_tensor_count: %u",
- sink->flexible_tensor_count);
+ GST_LOG_OBJECT (sink, "cumulative_tensors: %u", sink->cumulative_tensors);
json_array_add_int_element (sink->tensor_count_array,
- sink->flexible_tensor_count);
- sink->flexible_tensor_count += num_tensors;
+ sink->cumulative_tensors);
+ sink->cumulative_tensors += num_tensors;
sink->total_samples++;
{
GstDataRepoSink *sink = GST_DATA_REPO_SINK_CAST (bsink);
- sink->is_flexible_tensors =
- gst_tensor_pad_caps_is_flexible (GST_BASE_SINK_PAD (sink));
-
switch (sink->data_type) {
case GST_DATA_REPO_DATA_VIDEO:
case GST_DATA_REPO_DATA_AUDIO:
case GST_DATA_REPO_DATA_TEXT:
case GST_DATA_REPO_DATA_OCTET:
+ return gst_data_repo_sink_write_others (sink, buffer);
case GST_DATA_REPO_DATA_TENSOR:
- if (sink->is_flexible_tensors)
- return gst_data_repo_sink_write_flexible_tensors (sink, buffer);
- /* default write function for tensors(fixed), video, audio, text and octet */
+ {
+ sink->is_static_tensors =
+ gst_tensor_pad_caps_is_static (GST_BASE_SINK_PAD (sink));
+ if (!sink->is_static_tensors)
+ return gst_data_repo_sink_write_flexible_or_sparse_tensors (sink,
+ buffer);
return gst_data_repo_sink_write_others (sink, buffer);
+ }
case GST_DATA_REPO_DATA_IMAGE:
return gst_data_repo_sink_write_multi_images (sink, buffer);
default:
json_object_set_int_member (sink->json_object, "total_samples",
sink->total_samples);
- if (sink->is_flexible_tensors) {
+ if (sink->data_type == GST_DATA_REPO_DATA_TENSOR && !sink->is_static_tensors) {
json_object_set_array_member (sink->json_object, "sample_offset",
sink->sample_offset_array);
json_object_set_array_member (sink->json_object, "tensor_size",
JsonArray *sample_offset_array; /**< offset array of sample */
JsonArray *tensor_size_array; /**< size array of flexible tensor */
JsonArray *tensor_count_array; /**< array for the number of cumulative tensors */
- guint flexible_tensor_count;
+ guint cumulative_tensors; /**< the number of cumulated tensors */
- gboolean is_flexible_tensors;
+ gboolean is_static_tensors;
gint fd; /**< open file descriptor*/
GstDataRepoDataType data_type; /**< data type */
gint total_samples; /**< The number of total samples, in the case of multi-files, it is used as an index. */
src->caps = NULL;
src->sample_size = 0;
src->need_changed_caps = FALSE;
- src->is_flexible_tensors = FALSE;
+ src->is_static_tensors = FALSE;
/* Filling the buffer should be pending until set_caps() */
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
g_return_if_fail (src->shuffled_index_array != NULL);
GST_LOG_OBJECT (src, "samples index are shuffled");
-
/* Fisher-Yates algorithm */
/* The last index is the number of samples - 1. */
for (i = src->num_samples - 1; i > 0; i--) {
value_j = g_array_index (src->shuffled_index_array, guint, j);
/* shuffled_index_array->data type is gchar * */
- *(src->shuffled_index_array->data + (sizeof (guint) * i)) = value_j;
- *(src->shuffled_index_array->data + (sizeof (guint) * j)) = value_i;
+ *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * i)) =
+ value_j;
+ *(guint *) (src->shuffled_index_array->data + (sizeof (guint) * j)) =
+ value_i;
}
for (i = 0; i < src->shuffled_index_array->len; i++) {
}
/**
- * @brief Function to read tensors
+ * @brief Function to read flexible or sparse tensors
*/
static GstFlowReturn
-gst_data_repo_src_read_flexible_tensors (GstDataRepoSrc * src,
+gst_data_repo_src_read_flexible_or_sparse_tensors (GstDataRepoSrc * src,
GstBuffer ** buffer)
{
GstFlowReturn ret = GST_FLOW_OK;
case GST_DATA_REPO_DATA_OCTET:
return gst_data_repo_src_read_others (src, buffer);
case GST_DATA_REPO_DATA_TENSOR:
- if (src->is_flexible_tensors)
- return gst_data_repo_src_read_flexible_tensors (src, buffer);
- else
+ if (src->is_static_tensors)
return gst_data_repo_src_read_tensors (src, buffer);
+ else
+ return gst_data_repo_src_read_flexible_or_sparse_tensors (src, buffer);
case GST_DATA_REPO_DATA_IMAGE:
return gst_data_repo_src_read_multi_images (src, buffer);
default:
s = gst_caps_get_structure (caps, 0);
v = gst_structure_get_value (s, "format");
format = g_value_get_string (v);
- if (g_strcmp0 (format, "flexible") == 0)
- src->is_flexible_tensors = TRUE;
+ if (g_strcmp0 (format, "static") == 0)
+ src->is_static_tensors = TRUE;
break;
default:
break;
GST_INFO_OBJECT (src, "sample_size: %d", src->sample_size);
}
- if (src->is_flexible_tensors) {
+ if (src->data_type == GST_DATA_REPO_DATA_TENSOR && !src->is_static_tensors) {
if (!json_object_has_member (object, "sample_offset")) {
GST_ERROR_OBJECT (src, "There is not sample_offset field: %s", contents);
goto error;
/** if data_type is not GST_DATA_REPO_DATA_UNKNOWN and sample_size is 0 then
'caps' is set by property and sample size needs to be set by blocksize
(in the case of otect and text) */
- if (src->sample_size == 0 && !src->is_flexible_tensors) {
+ if (src->sample_size == 0 && (src->data_type == GST_DATA_REPO_DATA_OCTET
+ || src->data_type == GST_DATA_REPO_DATA_TEXT)) {
basesrc = GST_BASE_SRC (src);
g_object_get (G_OBJECT (basesrc), "blocksize", &blocksize, NULL);
GST_DEBUG_OBJECT (src, "blocksize = %d", blocksize);
GstPushSrc parent; /**< parent object */
GstPad *src_pad;
- gboolean is_flexible_tensors;
+ gboolean is_static_tensors;
gboolean is_start; /**< check if datareposrc is started */
gboolean successful_read; /**< used for checking EOS when reading more than one images(multi-files) from a path */
gint fd; /**< open file descriptor */
g_remove ("flexible.json");
}
+
+/**
+ * @brief Test for writing sparse tensors
+ */
+TEST (datareposink, writeSparseTensors)
+{
+ GFile *file = NULL;
+ gchar *contents = NULL;
+ GstBus *bus;
+ GMainLoop *loop;
+ gchar *file_path = NULL;
+ gchar *json_path = NULL;
+ gboolean ret;
+ gint64 size, org_size = 31760;
+ GFileInfo *info = NULL;
+
+ loop = g_main_loop_new (NULL, FALSE);
+
+ file_path = get_file_path (filename);
+ json_path = get_file_path (json);
+
+ gchar *str_pipeline = g_strdup_printf (
+ "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
+ "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! "
+ "datareposink location=sparse.data json=sparse.json",
+ file_path, json_path);
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ g_free (str_pipeline);
+ ASSERT_NE (pipeline, nullptr);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("sparse.json");
+ ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+ g_free (contents);
+ g_object_unref (file);
+ g_free (file_path);
+ g_free (json_path);
+ ASSERT_EQ (ret, TRUE);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("sparse.data");
+ ret = g_file_load_contents (file, NULL, &contents, NULL, NULL, NULL);
+ info = g_file_query_info (
+ file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+ size = g_file_info_get_size (info);
+ g_object_unref (file);
+ g_object_unref (info);
+ g_free (contents);
+ ASSERT_EQ (ret, TRUE);
+
+ /* The size of one mnist sample is 3176 bytes, the number of samples for test is 10. */
+ EXPECT_LT (size, org_size);
+
+ g_remove ("sparse.data");
+ g_remove ("sparse.json");
+}
+
/**
* @brief Test for writing a file with invalid param (JSON path)
*/
}
/**
+ * @brief Test for writing flexible tensors
+ */
+TEST (datareposink, writeSparseTensors_n)
+{
+ GFile *file = NULL;
+ GstBus *bus;
+ GMainLoop *loop;
+ GstElement *pipeline;
+ GFileInfo *file_info = NULL;
+ gint64 size = 0;
+ int i;
+ gchar *filename = NULL;
+
+ create_image_test_file ();
+
+ /* Insert non-Flexible Tensor data after negotiating with flexible caps. */
+ const gchar *str_pipeline
+ = "multifilesrc location=img_%02d.png ! other/tensors,format=sparse,framerate=0/1 ! "
+ "datareposink location=sparse.data json=sparse.json";
+
+ pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ /* Confirm file creation */
+ file = g_file_new_for_path ("sparse.data");
+ ASSERT_NE (file, nullptr);
+ file_info = g_file_query_info (
+ file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+ ASSERT_NE (file_info, nullptr);
+ size = g_file_info_get_size (file_info);
+ ASSERT_EQ (size, 0);
+ g_object_unref (file_info);
+ g_object_unref (file);
+
+ for (i = 0; i < 5; i++) {
+ filename = g_strdup_printf ("img_%02d.png", i);
+ g_remove (filename);
+ g_free (filename);
+ }
+
+ g_remove ("img.json");
+ g_remove ("sparse.json");
+ g_remove ("sparse.data");
+}
+
+/**
* @brief Main GTest
*/
int
}
/**
+ * @brief create sparse tensors file
+ */
+static void
+create_sparse_tensors_test_file ()
+{
+ GstBus *bus;
+ GMainLoop *loop;
+ gchar *file_path = get_file_path (filename);
+ gchar *json_path = get_file_path (json);
+
+ const gchar *str_pipeline = g_strdup_printf (
+ "datareposrc location=%s json=%s start-sample-index=0 stop-sample-index=9 ! "
+ "tensor_sparse_enc ! other/tensors,format=sparse,framerate=0/1 ! "
+ "datareposink location=sparse.data json=sparse.json",
+ file_path, json_path);
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+}
+
+/**
* @brief create flexible tensors file
*/
static void
}
/**
+ * @brief Test for reading a file composed of sparse tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readSparseTensors)
+{
+ GFile *file = NULL;
+ GFileInfo *info = NULL;
+ gint64 size, org_size = 31760;
+ gint buffer_count = 0;
+ GstElement *tensor_sink;
+ GstBus *bus;
+ const gchar *str_pipeline = NULL;
+ GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ str_pipeline
+ = "datareposrc location=sparse.data json=sparse.json ! tensor_sparse_dec ! "
+ "other/tensors, format=static, num_tensors=2, framerate=0/1, "
+ "dimensions=1:1:784:1.1:1:10:1, types=\"float32,float32\" ! tee name= t "
+ "t. ! queue ! filesink location=sample.data t. ! queue ! tensor_sink name=tensor_sink0";
+
+ create_sparse_tensors_test_file ();
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ EXPECT_NE (buffer_count, 0);
+ handler = NULL;
+
+ file = g_file_new_for_path ("sparse.data");
+ info = g_file_query_info (
+ file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+ size = g_file_info_get_size (info);
+ g_object_unref (file);
+ g_object_unref (info);
+ EXPECT_LT (size, org_size);
+
+ file = g_file_new_for_path ("sample.data");
+ info = g_file_query_info (
+ file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL);
+ size = g_file_info_get_size (info);
+ g_object_unref (file);
+ g_object_unref (info);
+ EXPECT_EQ (size, org_size);
+
+ gst_object_unref (tensor_sink);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ g_remove ("sparse.json");
+ g_remove ("sparse.data");
+ g_remove ("sample.data");
+}
+
+/**
* @brief Test for reading a tensors file with Caps property
*/
TEST (datareposrc, readTensorsNoJSONWithCapsParam)
}
/**
+ * @brief Test for reading a file composed of non-sparse tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readInvalidSparseTensors)
+{
+ gint buffer_count = 0;
+ GstBus *bus;
+ GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ const gchar *str_pipeline
+ = "datareposrc location=audio1.raw json=sparse.json ! tensor_sink name=tensor_sink0";
+ GstElement *tensor_sink;
+
+ create_sparse_tensors_test_file ();
+ create_audio_test_file ();
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ /* EXPECT_EQ not checked due to internal data stream error */
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+ /* Internal data stream error */
+ EXPECT_EQ (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ g_remove ("audio1.json");
+ g_remove ("audio1.raw");
+ g_remove ("sparse.json");
+ g_remove ("sparse.data");
+}
+
+/**
* @brief Main GTest
*/
int