#include <sys/types.h>
#include <sys/stat.h>
#include <inttypes.h>
-#include <json-glib/json-glib.h>
#include "gstdatareposrc.h"
#define struct_stat struct stat
src->filename = NULL;
src->json_filename = NULL;
src->tensors_seq_str = NULL;
+ src->file_size = 0;
src->fd = 0;
src->data_type = GST_DATA_REPO_DATA_UNKNOWN;
- src->offset = 0;
+ src->fd_offset = 0;
src->start_offset = 0;
src->last_offset = 0;
src->successful_read = FALSE;
src->epochs = DEFAULT_EPOCHS;
src->shuffled_index_array = g_array_new (FALSE, FALSE, sizeof (guint));
src->array_index = 0;
+ src->sample_offset_array = NULL;
+ src->sample_offset_array_len = 0;
+ src->tensor_size_array = NULL;
+ src->tensor_size_array_len = 0;
+ src->tensor_count_array = NULL;
+ src->tensor_count_array_len = 0;
src->first_epoch_is_done = FALSE;
src->is_shuffle = DEFAULT_IS_SHUFFLE;
src->num_samples = 0;
src->caps = NULL;
src->sample_size = 0;
src->need_changed_caps = FALSE;
+ src->is_flexible_tensors = FALSE;
/* Filling the buffer should be pending until set_caps() */
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
g_free (src->json_filename);
g_free (src->tensors_seq_str);
+ /* close the file */
+ if (src->fd) {
+ g_close (src->fd, NULL);
+ src->fd = 0;
+ }
+
+ g_object_unref (src->parser);
+
if (src->shuffled_index_array)
g_array_free (src->shuffled_index_array, TRUE);
static GstFlowReturn
gst_data_repo_src_read_tensors (GstDataRepoSrc * src, GstBuffer ** buffer)
{
+ GstFlowReturn ret = GST_FLOW_OK;
guint i = 0, seq_idx = 0;
GstBuffer *buf;
guint to_read, byte_read;
- int ret;
+ int read_size;
guint8 *data;
- GstMemory *mem[MAX_ITEM] = { 0, };
- GstMapInfo info[MAX_ITEM];
+ GstMemory *mem = NULL;
+ GstMapInfo info;
guint shuffled_index = 0;
guint64 sample_offset = 0;
guint64 offset = 0; /* offset from 0 */
for (i = 0; i < src->tensors_seq_cnt; i++) {
seq_idx = src->tensors_seq[i];
- mem[i] = gst_allocator_alloc (NULL, src->tensors_size[seq_idx], NULL);
+ mem = gst_allocator_alloc (NULL, src->tensors_size[seq_idx], NULL);
- if (!gst_memory_map (mem[i], &info[i], GST_MAP_WRITE)) {
+ if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
GST_ERROR_OBJECT (src, "Could not map GstMemory[%d]", i);
+ ret = GST_FLOW_ERROR;
goto error;
}
if user sets "tensor-sequence=2,1", datareposrc read offset 9528 then 9488.
*/
- data = info[i].data;
+ data = info.data;
byte_read = 0;
to_read = src->tensors_size[seq_idx];
offset = sample_offset + src->tensors_offset[seq_idx];
- src->offset = lseek (src->fd, offset, SEEK_SET);
+ src->fd_offset = lseek (src->fd, offset, SEEK_SET);
while (to_read > 0) {
GST_LOG_OBJECT (src,
"Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x (%d size)",
- to_read, src->offset + byte_read, (guint) src->offset + byte_read);
+ to_read, src->fd_offset + byte_read,
+ (guint) src->fd_offset + byte_read);
errno = 0;
- ret = read (src->fd, data + byte_read, to_read);
- GST_LOG_OBJECT (src, "Read: %d", ret);
- if (ret < 0) {
+ read_size = read (src->fd, data + byte_read, to_read);
+ GST_LOG_OBJECT (src, "Read: %d", read_size);
+ if (read_size < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
- goto could_not_read;
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ ret = GST_FLOW_ERROR;
+ goto error;
}
/* files should eos if they read 0 and more was requested */
- if (ret == 0) {
+ if (read_size == 0) {
/* .. but first we should return any remaining data */
if (byte_read > 0)
break;
- goto eos;
+ GST_DEBUG ("EOS");
+ ret = GST_FLOW_EOS;
+ goto error;
}
- to_read -= ret;
- byte_read += ret;
+ to_read -= read_size;
+ byte_read += read_size;
- src->read_position += ret;
- src->offset += ret;
+ src->read_position += read_size;
+ src->fd_offset += read_size;
}
- if (mem[i])
- gst_memory_unmap (mem[i], &info[i]);
-
- gst_buffer_append_memory (buf, mem[i]);
+ gst_memory_unmap (mem, &info);
+ gst_buffer_append_memory (buf, mem);
}
*buffer = buf;
return GST_FLOW_OK;
-could_not_read:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_memory_unmap (mem[0], &info[0]);
- gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+error:
+ if (mem) {
+ gst_memory_unmap (mem, &info);
+ gst_memory_unref (mem);
}
-eos:
- {
- GST_DEBUG ("EOS");
- gst_memory_unmap (mem[0], &info[0]);
- gst_buffer_unref (buf);
- return GST_FLOW_EOS;
+
+ gst_buffer_unref (buf);
+
+ return ret;
+}
+
+/**
+ * @brief Function to get num_tensors from tensor_count_array
+ */
+static guint
+gst_data_repo_src_get_num_tensors (GstDataRepoSrc * src, guint shuffled_index)
+{
+ guint num_tensors = 0;
+ guint cur_idx_tensor_cnt = 0;
+ guint next_idx_tensor_cnt = 0;
+
+ g_return_val_if_fail (src != NULL, 0);
+
+ cur_idx_tensor_cnt =
+ json_array_get_int_element (src->tensor_count_array, shuffled_index);
+ GST_DEBUG_OBJECT (src, "cur_idx_tensor_cnt:%u", cur_idx_tensor_cnt);
+
+ if (shuffled_index + 1 == src->tensor_count_array_len) {
+ next_idx_tensor_cnt = src->tensor_size_array_len;
+ } else {
+ next_idx_tensor_cnt =
+ json_array_get_int_element (src->tensor_count_array,
+ shuffled_index + 1);
+ }
+ GST_DEBUG_OBJECT (src, "next_idx_tensor_cnt:%u", next_idx_tensor_cnt);
+
+ num_tensors = next_idx_tensor_cnt - cur_idx_tensor_cnt;
+ GST_DEBUG_OBJECT (src, "num_tensors:%u", num_tensors);
+
+ return num_tensors;
+}
+
+/**
+ * @brief Function to read tensors
+ */
+static GstFlowReturn
+gst_data_repo_src_read_flexible_tensors (GstDataRepoSrc * src,
+ GstBuffer ** buffer)
+{
+ GstFlowReturn ret = GST_FLOW_OK;
+ guint i;
+ guint shuffled_index = 0;
+ gint64 sample_offset;
+ guint num_tensors = 0;
+ GstBuffer *buf;
+ GstMemory *mem;
+ GstMapInfo info;
+ GstTensorMetaInfo meta;
+ guint to_read, byte_read;
+ int read_size;
+ guint8 *data;
+ guint tensor_count;
+ guint tensor_size;
+
+ g_return_val_if_fail (src->fd != 0, GST_FLOW_ERROR);
+ g_return_val_if_fail (src->shuffled_index_array != NULL, GST_FLOW_ERROR);
+
+ if (gst_data_repo_src_epoch_is_done (src)) {
+ if (src->epochs == 0) {
+ GST_LOG_OBJECT (src, "send EOS");
+ return GST_FLOW_EOS;
+ }
+ if (src->is_shuffle)
+ gst_data_repo_src_shuffle_samples_index (src);
+ }
+
+ /* only do for first epoch */
+ if (!src->first_epoch_is_done) {
+ /* append samples index to array */
+ g_array_append_val (src->shuffled_index_array, src->current_sample_index);
+ src->current_sample_index++;
+ }
+
+ shuffled_index =
+ g_array_index (src->shuffled_index_array, guint, src->array_index++);
+ GST_LOG_OBJECT (src, "shuffled_index [%d] -> %d", src->array_index - 1,
+ shuffled_index);
+
+ /* sample offset from 0 */
+ sample_offset =
+ json_array_get_int_element (src->sample_offset_array, shuffled_index);
+ GST_LOG_OBJECT (src, "sample offset 0x%" G_GINT64_MODIFIER "x (%d size)",
+ sample_offset, (guint) sample_offset);
+
+ src->fd_offset = lseek (src->fd, sample_offset, SEEK_SET);
+
+ buf = gst_buffer_new ();
+
+ tensor_count =
+ json_array_get_int_element (src->tensor_count_array, shuffled_index);
+ num_tensors = gst_data_repo_src_get_num_tensors (src, shuffled_index);
+
+ for (i = 0; i < num_tensors; i++) {
+ tensor_size =
+ json_array_get_int_element (src->tensor_size_array, tensor_count + i);
+ mem = gst_allocator_alloc (NULL, tensor_size, NULL);
+
+ if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
+ GST_ERROR_OBJECT (src, "Could not map GstMemory[%d]", i);
+ ret = GST_FLOW_ERROR;
+ goto error;
+ }
+
+ data = info.data;
+ byte_read = 0;
+ to_read = tensor_size;
+ while (to_read > 0) {
+ GST_LOG_OBJECT (src,
+ "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
+ to_read, src->fd_offset + byte_read, (long long) src->fd_offset);
+ errno = 0;
+ read_size = read (src->fd, data + byte_read, to_read);
+ GST_LOG_OBJECT (src, "Read: %d", read_size);
+ if (read_size < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ ret = GST_FLOW_ERROR;
+ goto error;
+ }
+ /* files should eos if they read 0 and more was requested */
+ if (read_size == 0) {
+ /* .. but first we should return any remaining data */
+ if (byte_read > 0)
+ break;
+ GST_DEBUG ("EOS");
+ ret = GST_FLOW_EOS;
+ goto error;
+ }
+ to_read -= read_size;
+ byte_read += read_size;
+
+ src->read_position += read_size;
+ src->fd_offset += read_size;
+ }
+
+ /* check invalid flexible tensor */
+ if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
+ GST_ERROR_OBJECT (src, "Invalid flexible tensors");
+ ret = GST_FLOW_ERROR;
+ goto error;
+ }
+
+ gst_memory_unmap (mem, &info);
+ gst_buffer_append_memory (buf, mem);
}
+
+ *buffer = buf;
+
+ return GST_FLOW_OK;
+
error:
+ if (mem) {
+ gst_memory_unmap (mem, &info);
+ gst_memory_unref (mem);
+ }
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+
+ return ret;
}
/**
gchar *data;
gchar *filename;
GstBuffer *buf;
- gboolean ret;
+ gboolean read_size;
GError *error = NULL;
g_return_val_if_fail (src->shuffled_index_array != NULL, GST_FLOW_ERROR);
src->array_index++;
/* Try to read one image */
- ret = g_file_get_contents (filename, &data, &size, &error);
- if (!ret) {
+ read_size = g_file_get_contents (filename, &data, &size, &error);
+ if (!read_size) {
if (src->successful_read) {
/* If we've read at least one buffer successfully, not finding the next file is EOS. */
g_free (filename);
static GstFlowReturn
gst_data_repo_src_read_others (GstDataRepoSrc * src, GstBuffer ** buffer)
{
+ GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
guint to_read, byte_read;
- int ret;
+ int read_size;
guint8 *data;
GstMemory *mem;
GstMapInfo info;
GST_LOG_OBJECT (src, "shuffled_index [%d] -> %d", src->array_index - 1,
shuffled_index);
offset = gst_data_repo_src_get_file_offset (src, shuffled_index);
- src->offset = lseek (src->fd, offset, SEEK_SET);
+ src->fd_offset = lseek (src->fd, offset, SEEK_SET);
mem = gst_allocator_alloc (NULL, src->sample_size, NULL);
if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
GST_ERROR_OBJECT (src, "Could not map GstMemory");
- goto error;
+ return GST_FLOW_ERROR;
}
data = info.data;
to_read = src->sample_size;
while (to_read > 0) {
GST_LOG_OBJECT (src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
- to_read, src->offset + byte_read);
+ to_read, src->fd_offset + byte_read);
errno = 0;
- ret = read (src->fd, data + byte_read, to_read);
- GST_LOG_OBJECT (src, "Read: %d", ret);
- if (ret < 0) {
+ read_size = read (src->fd, data + byte_read, to_read);
+ GST_LOG_OBJECT (src, "Read: %d", read_size);
+ if (read_size < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
- goto could_not_read;
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+ ret = GST_FLOW_ERROR;
+ goto error;
}
/* files should eos if they read 0 and more was requested */
- if (ret == 0) {
+ if (read_size == 0) {
/* .. but first we should return any remaining data */
if (byte_read > 0)
break;
- goto eos;
+ GST_DEBUG ("EOS");
+ ret = GST_FLOW_EOS;
+ goto error;
}
- to_read -= ret;
- byte_read += ret;
+ to_read -= read_size;
+ byte_read += read_size;
- src->read_position += ret;
- src->offset += ret;
+ src->read_position += read_size;
+ src->fd_offset += read_size;
}
- if (mem)
- gst_memory_unmap (mem, &info);
+ gst_memory_unmap (mem, &info);
buf = gst_buffer_new ();
gst_buffer_append_memory (buf, mem);
*buffer = buf;
return GST_FLOW_OK;
-could_not_read:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
- gst_memory_unmap (mem, &info);
- return GST_FLOW_ERROR;
- }
-eos:
- {
- GST_DEBUG ("EOS");
+error:
+ if (mem) {
gst_memory_unmap (mem, &info);
- return GST_FLOW_EOS;
+ gst_memory_unref (mem);
}
-error:
- return GST_FLOW_ERROR;
+
+ return ret;
}
/**
if (fstat (src->fd, &stat_results) < 0)
goto no_stat;
+ src->file_size = stat_results.st_size;
+ if (src->file_size == 0)
+ goto error_close;
+
if (S_ISDIR (stat_results.st_mode))
goto was_directory;
src->last_offset =
gst_data_repo_src_get_file_offset (src, src->stop_sample_index);
- src->offset = lseek (src->fd, src->start_offset, SEEK_SET);
+ src->fd_offset = lseek (src->fd, src->start_offset, SEEK_SET);
GST_LOG_OBJECT (src, "Start file offset 0x%" G_GINT64_MODIFIER "x",
- src->offset);
+ src->fd_offset);
}
g_free (filename);
case GST_DATA_REPO_DATA_OCTET:
return gst_data_repo_src_read_others (src, buffer);
case GST_DATA_REPO_DATA_TENSOR:
- return gst_data_repo_src_read_tensors (src, buffer);
+ if (src->is_flexible_tensors)
+ return gst_data_repo_src_read_flexible_tensors (src, buffer);
+ else
+ return gst_data_repo_src_read_tensors (src, buffer);
case GST_DATA_REPO_DATA_IMAGE:
return gst_data_repo_src_read_multi_images (src, buffer);
default:
static gboolean
gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
{
+ GstStructure *s;
+ const GValue *v;
+ const gchar *format;
+
g_return_val_if_fail (src != NULL, FALSE);
g_return_val_if_fail (caps != NULL, FALSE);
break;
case GST_DATA_REPO_DATA_TENSOR:
src->sample_size = gst_data_repo_src_get_tensors_size (src, caps);
+ s = gst_caps_get_structure (caps, 0);
+ v = gst_structure_get_value (s, "format");
+ format = g_value_get_string (v);
+ if (g_strcmp0 (format, "flexible") == 0)
+ src->is_flexible_tensors = TRUE;
break;
default:
break;
GError *error = NULL;
GFile *file;
gchar *contents;
- JsonParser *parser;
JsonNode *root;
JsonObject *object;
const gchar *caps_str = NULL;
return FALSE;
}
- parser = json_parser_new ();
- if (!json_parser_load_from_data (parser, contents, -1, NULL)) {
+
+ src->parser = json_parser_new ();
+
+ if (!json_parser_load_from_data (src->parser, contents, -1, NULL)) {
GST_ERROR_OBJECT (src, "Failed to load data from %s", src->json_filename);
goto error;
}
- root = json_parser_get_root (parser);
+ root = json_parser_get_root (src->parser);
if (!JSON_NODE_HOLDS_OBJECT (root)) {
GST_ERROR_OBJECT (src, "it does not contain a JsonObject: %s", contents);
goto error;
GST_INFO_OBJECT (src, "sample_size: %d", src->sample_size);
}
- if (src->sample_size == 0)
- goto error;
+ if (src->is_flexible_tensors) {
+ if (!json_object_has_member (object, "sample_offset")) {
+ GST_ERROR_OBJECT (src, "There is not sample_offset field: %s", contents);
+ goto error;
+ }
+ src->sample_offset_array =
+ json_object_get_array_member (object, "sample_offset");
+ src->sample_offset_array_len =
+ json_array_get_length (src->sample_offset_array);
+
+ if (!json_object_has_member (object, "tensor_size")) {
+ GST_ERROR_OBJECT (src, "There is not tensor_size field: %s", contents);
+ goto error;
+ }
+ src->tensor_size_array =
+ json_object_get_array_member (object, "tensor_size");
+ src->tensor_size_array_len = json_array_get_length (src->tensor_size_array);
+ GST_INFO_OBJECT (src, "tensor_size_array_len:%u",
+ src->tensor_size_array_len);
+
+ if (!json_object_has_member (object, "tensor_count")) {
+ GST_ERROR_OBJECT (src, "There is not tensor_count field: %s", contents);
+ goto error;
+ }
+ src->tensor_count_array =
+ json_object_get_array_member (object, "tensor_count");
+ src->tensor_count_array_len =
+ json_array_get_length (src->tensor_count_array);
+ GST_INFO_OBJECT (src, "tensor_count_array_len:%u",
+ src->tensor_count_array_len);
+ } else {
+ if (src->sample_size == 0)
+ goto error;
+ }
if (!json_object_has_member (object, "total_samples")) {
GST_ERROR_OBJECT (src, "There is not total_samples field: %s", contents);
goto error;
g_free (contents);
- g_object_unref (parser);
g_object_unref (file);
return TRUE;
-
error:
src->data_type = GST_DATA_REPO_DATA_UNKNOWN;
GST_ERROR_OBJECT (src, "Failed to parse %s", src->json_filename);
g_free (contents);
- g_object_unref (parser);
g_object_unref (file);
return FALSE;
/** if data_type is not GST_DATA_REPO_DATA_UNKNOWN and sample_size is 0 then
'caps' is set by property and sample size needs to be set by blocksize
(in the case of otect and text) */
- if (src->sample_size == 0) {
+ if (src->sample_size == 0 && !src->is_flexible_tensors) {
basesrc = GST_BASE_SRC (src);
g_object_get (G_OBJECT (basesrc), "blocksize", &blocksize, NULL);
GST_DEBUG_OBJECT (src, "blocksize = %d", blocksize);
{
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
+ case GST_MESSAGE_ERROR:
g_main_loop_quit ((GMainLoop *) data);
break;
default:
}
/**
- * @brief create video test file
+ * @brief Callback for tensor sink signal.
*/
static void
-create_video_test_file ()
+new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data)
+{
+ (*user_data)++;
+ g_warning ("count:%d", *user_data);
+ return;
+}
+
+/**
+ * @brief create flexible tensors file
+ */
+static void
+create_flexible_tensors_test_file ()
{
GstBus *bus;
GMainLoop *loop;
+ const gchar *str_pipeline
+ = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 "
+ "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 "
+ "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+ "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 "
+ "join name=join0 ! other/tensors,format=flexible ! "
+ "datareposink location=flexible.data json=flexible.json";
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+}
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! "
- "datareposink location=video1.raw json=video1.json");
+/**
+ * @brief create video test file
+ */
+static void
+create_video_test_file ()
+{
+ GstBus *bus;
+ GMainLoop *loop;
+ const gchar *str_pipeline = "videotestsrc num-buffers=10 ! "
+ "datareposink location=video1.raw json=video1.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
{
GstBus *bus;
GMainLoop *loop;
-
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup (
- "gst-launch-1.0 audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
- "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
- "datareposink location=audio1.raw json=audio1.json");
+ const gchar *str_pipeline
+ = "audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
+ "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
+ "datareposink location=audio1.raw json=audio1.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
+ loop = g_main_loop_new (NULL, FALSE);
ASSERT_NE (pipeline, nullptr);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
{
GstBus *bus;
GMainLoop *loop;
-
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! "
- "datareposink location=img_%02d.png json=img.json");
+ const gchar *str_pipeline = "videotestsrc num-buffers=5 ! pngenc ! "
+ "datareposink location=img_%02d.png json=img.json";
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
*/
TEST (datareposrc, readImageFiles)
{
+ gchar *filename;
+ gint buffer_count = 0, i;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ GstElement *tensor_sink;
GstBus *bus;
GMainLoop *loop;
+ const gchar *str_pipeline
+ = "datareposrc location=img_%02d.png json=img.json start-sample-index=0 stop-sample-index=4 !"
+ "pngdec ! tensor_converter ! tensor_sink name=tensor_sink0";
create_image_test_file ();
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup (
- "gst-launch-1.0 datareposrc location=img_%02d.png json=img.json "
- "start-sample-index=0 stop-sample-index=4 ! pngdec ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
gst_object_unref (bus);
- EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
g_main_loop_run (loop);
setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+ EXPECT_NE (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
gst_object_unref (pipeline);
g_main_loop_unref (loop);
+
+ for (i = 0; i < 5; i++) {
+ filename = g_strdup_printf ("img_%02d.png", i);
+ g_remove (filename);
+ g_free (filename);
+ }
}
/**
*/
TEST (datareposrc, readVideoRaw)
{
+ gint buffer_count = 0;
+ GstElement *tensor_sink;
GstBus *bus;
GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ const gchar *str_pipeline
+ = "datareposrc location=video1.raw json=video1.json ! tensor_converter ! tensor_sink name=tensor_sink0";
create_video_test_file ();
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup (
- "gst-launch-1.0 datareposrc location=video1.raw json=video1.json ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
gst_object_unref (bus);
- EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
g_main_loop_run (loop);
setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ EXPECT_NE (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
gst_object_unref (pipeline);
g_main_loop_unref (loop);
+
+ g_remove ("video1.json");
+ g_remove ("video1.raw");
}
/**
*/
TEST (datareposrc, readAudioRaw)
{
+ gint buffer_count = 0;
+ GstElement *tensor_sink;
GstBus *bus;
GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ const gchar *str_pipeline
+ = "datareposrc location=audio1.raw json=audio1.json ! tensor_converter ! tensor_sink name=tensor_sink0";
create_audio_test_file ();
- loop = g_main_loop_new (NULL, FALSE);
-
- gchar *str_pipeline = g_strdup (
- "gst-launch-1.0 datareposrc location=audio1.raw json=audio1.json ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ASSERT_NE (bus, nullptr);
gst_bus_add_watch (bus, bus_callback, loop);
gst_object_unref (bus);
- EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
g_main_loop_run (loop);
setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+ EXPECT_NE (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
gst_object_unref (pipeline);
g_main_loop_unref (loop);
+
+ g_remove ("audio1.json");
+ g_remove ("audio1.raw");
}
/**
TEST (datareposrc, invalidJsonPath0_n)
{
GstElement *datareposrc = NULL;
+ const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
TEST (datareposrc, invalidJsonPath1_n)
{
GstElement *datareposrc = NULL;
+ const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
TEST (datareposrc, invalidFilePath0_n)
{
GstElement *datareposrc = NULL;
+ const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
TEST (datareposrc, invalidFilePath1_n)
{
GstElement *datareposrc = NULL;
+ const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
TEST (datareposrc, invalidCapsWithoutJSON_n)
{
GstElement *datareposrc = NULL;
+ const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
- gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
- g_free (str_pipeline);
ASSERT_NE (pipeline, nullptr);
datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
file_path = get_file_path (filename);
json_path = get_file_path (json);
- gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
- "start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! "
- "fakesink",
+ gchar *str_pipeline = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+ "start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! "
+ "fakesink",
file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
g_free (str_pipeline);
g_main_loop_run (loop);
- setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
gst_object_unref (pipeline);
g_main_loop_unref (loop);
}
/**
+ * @brief Test for reading a file composed of flexible tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readFlexibleTensors)
+{
+ gint buffer_count = 0;
+ GstElement *tensor_sink;
+ GstBus *bus;
+ const gchar *str_pipeline = NULL;
+ GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tensor_sink name=tensor_sink0";
+
+ create_flexible_tensors_test_file ();
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ EXPECT_NE (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
+
+ g_remove ("flexible.json");
+ g_remove ("flexible.data");
+}
+
+/**
* @brief Test for reading a tensors file with Caps property
*/
TEST (datareposrc, readTensorsNoJSONWithCapsParam)
file_path = get_file_path (filename);
gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s "
+ "datareposrc name=datareposrc location=%s "
"start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 "
"caps =\"other/tensors, format=(string)static, framerate=(fraction)0/1, "
"num_tensors=(int)2, dimensions=(string)1:1:784:1.1:1:10:1, types=(string)float32.float32\" ! "
g_main_loop_run (loop);
- setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+ EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
gst_object_unref (pipeline);
g_main_loop_unref (loop);
file_path = get_file_path (filename);
json_path = get_file_path (json);
- gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
- "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
- file_path, json_path);
+ gchar *str_pipeline
+ = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+ "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
+ file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
g_free (str_pipeline);
g_free (file_path);
file_path = get_file_path (filename);
json_path = get_file_path (json);
- gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
- "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
- file_path, json_path);
+ gchar *str_pipeline
+ = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+ "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
+ file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
g_free (str_pipeline);
g_free (file_path);
file_path = get_file_path (filename);
json_path = get_file_path (json);
- gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
- "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
- file_path, json_path);
+ gchar *str_pipeline
+ = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+ "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
+ file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
g_free (str_pipeline);
g_free (file_path);
file_path = get_file_path (filename);
json_path = get_file_path (json);
- gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
- "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
- file_path, json_path);
+ gchar *str_pipeline
+ = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+ "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
+ file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
g_free (str_pipeline);
g_free (file_path);
json_path = get_file_path (json);
gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+ "datareposrc name=datareposrc location=%s json=%s "
"start-sample-index=0 stop-sample-index=9 tensors-sequence=0,1 ! fakesink",
file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
json_path = get_file_path (json);
gchar *str_pipeline = g_strdup_printf (
- "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+ "datareposrc name=datareposrc location=%s json=%s "
"start-sample-index=0 stop-sample-index=9 tensors-sequence=0,1 ! fakesink",
file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
json_path = get_file_path (json);
gchar *str_pipeline
- = g_strdup_printf ("gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+ = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
"start-sample-index=0 stop-sample-index=9 ! fakesink",
file_path, json_path);
GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
}
/**
- * @brief remove test file
+ * @brief Test for reading a file composed of non-flexible tensors
+ * the default shuffle is TRUE.
*/
-static void
-remove_test_file (void)
+TEST (datareposrc, readInvalidFlexibleTensors)
{
- gchar *filename = NULL;
- int i;
+ gint buffer_count = 0;
+ GstBus *bus;
+ GMainLoop *loop;
+ GCallback handler = G_CALLBACK (new_data_cb);
+ const gchar *str_pipeline
+ = "datareposrc location=audio1.raw json=flexible.json ! tensor_sink name=tensor_sink0";
+ GstElement *tensor_sink;
+
+ create_flexible_tensors_test_file ();
+ create_audio_test_file ();
+
+ GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+ ASSERT_NE (pipeline, nullptr);
+
+ tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+ ASSERT_NE (tensor_sink, nullptr);
+
+ g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+ loop = g_main_loop_new (NULL, FALSE);
+ bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+ ASSERT_NE (bus, nullptr);
+ gst_bus_add_watch (bus, bus_callback, loop);
+ gst_object_unref (bus);
+
+ /* EXPECT_EQ not checked due to internal data stream error */
+ setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+ g_main_loop_run (loop);
+
+ setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+ /* Internal data stream error */
+ EXPECT_EQ (buffer_count, 0);
+ handler = NULL;
+
+ gst_object_unref (tensor_sink);
+ gst_object_unref (pipeline);
+ g_main_loop_unref (loop);
g_remove ("audio1.json");
g_remove ("audio1.raw");
- g_remove ("video1.json");
- g_remove ("video1.raw");
- g_remove ("img.son");
-
- for (i = 0; i < 5; i++) {
- filename = g_strdup_printf ("img_%02d.png", i);
- g_remove (filename);
- g_free (filename);
- }
+ g_remove ("flexible.json");
+ g_remove ("flexible.data");
}
/**
g_warning ("catch `testing::internal::GoogleTestFailureException`");
}
- remove_test_file ();
-
return result;
}