[datareposrc] Add function to read flexible tensors
authorhyunil park <hyunil46.park@samsung.com>
Tue, 27 Jun 2023 04:13:41 +0000 (13:13 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 19 Jul 2023 08:00:32 +0000 (17:00 +0900)
- If the tensor format of the caps of the JSON file is flexible,
  Reads a sample using the flexible meta information (sample_offset, tensor_size, tensor_count)
  of the JSON file and appends memory to GstBuffer as many as the number of flexible tensors.
- Add unit test
- Add checking if it is a flexible tensor after writing data to GstMemory

- Reference
  * The start offset for reading is sample_offset(sample size).
  * Save each flexible tensor stored in a sample to a gstbuffer according to each
    tensor_size, tensor_size can get tensor_size field in JSON file.
  * A shuffled index is mapped to an index of sample_offset field in JSON file.
  * A shuffled index is also mapped to an index of tensor_count field in JSON file
  * The index value is a number of cumulative tensors, so it is mapped to an index
    of tensor_size field in JSON file

Signed-off-by: hyunil park <hyunil46.park@samsung.com>
gst/datarepo/gstdatareposrc.c
gst/datarepo/gstdatareposrc.h
tests/nnstreamer_datarepo/unittest_datareposrc.cc

index f083b6b..9816d54 100644 (file)
@@ -47,7 +47,6 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <inttypes.h>
-#include <json-glib/json-glib.h>
 #include "gstdatareposrc.h"
 
 #define struct_stat struct stat
@@ -226,9 +225,10 @@ gst_data_repo_src_init (GstDataRepoSrc * src)
   src->filename = NULL;
   src->json_filename = NULL;
   src->tensors_seq_str = NULL;
+  src->file_size = 0;
   src->fd = 0;
   src->data_type = GST_DATA_REPO_DATA_UNKNOWN;
-  src->offset = 0;
+  src->fd_offset = 0;
   src->start_offset = 0;
   src->last_offset = 0;
   src->successful_read = FALSE;
@@ -239,6 +239,12 @@ gst_data_repo_src_init (GstDataRepoSrc * src)
   src->epochs = DEFAULT_EPOCHS;
   src->shuffled_index_array = g_array_new (FALSE, FALSE, sizeof (guint));
   src->array_index = 0;
+  src->sample_offset_array = NULL;
+  src->sample_offset_array_len = 0;
+  src->tensor_size_array = NULL;
+  src->tensor_size_array_len = 0;
+  src->tensor_count_array = NULL;
+  src->tensor_count_array_len = 0;
   src->first_epoch_is_done = FALSE;
   src->is_shuffle = DEFAULT_IS_SHUFFLE;
   src->num_samples = 0;
@@ -247,6 +253,7 @@ gst_data_repo_src_init (GstDataRepoSrc * src)
   src->caps = NULL;
   src->sample_size = 0;
   src->need_changed_caps = FALSE;
+  src->is_flexible_tensors = FALSE;
 
   /* Filling the buffer should be pending until set_caps() */
   gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
@@ -265,6 +272,14 @@ gst_data_repo_src_finalize (GObject * object)
   g_free (src->json_filename);
   g_free (src->tensors_seq_str);
 
+  /* close the file */
+  if (src->fd) {
+    g_close (src->fd, NULL);
+    src->fd = 0;
+  }
+
+  g_object_unref (src->parser);
+
   if (src->shuffled_index_array)
     g_array_free (src->shuffled_index_array, TRUE);
 
@@ -456,13 +471,14 @@ gst_data_repo_src_epoch_is_done (GstDataRepoSrc * src)
 static GstFlowReturn
 gst_data_repo_src_read_tensors (GstDataRepoSrc * src, GstBuffer ** buffer)
 {
+  GstFlowReturn ret = GST_FLOW_OK;
   guint i = 0, seq_idx = 0;
   GstBuffer *buf;
   guint to_read, byte_read;
-  int ret;
+  int read_size;
   guint8 *data;
-  GstMemory *mem[MAX_ITEM] = { 0, };
-  GstMapInfo info[MAX_ITEM];
+  GstMemory *mem = NULL;
+  GstMapInfo info;
   guint shuffled_index = 0;
   guint64 sample_offset = 0;
   guint64 offset = 0;           /* offset from 0 */
@@ -499,10 +515,11 @@ gst_data_repo_src_read_tensors (GstDataRepoSrc * src, GstBuffer ** buffer)
 
   for (i = 0; i < src->tensors_seq_cnt; i++) {
     seq_idx = src->tensors_seq[i];
-    mem[i] = gst_allocator_alloc (NULL, src->tensors_size[seq_idx], NULL);
+    mem = gst_allocator_alloc (NULL, src->tensors_size[seq_idx], NULL);
 
-    if (!gst_memory_map (mem[i], &info[i], GST_MAP_WRITE)) {
+    if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
       GST_ERROR_OBJECT (src, "Could not map GstMemory[%d]", i);
+      ret = GST_FLOW_ERROR;
       goto error;
     }
 
@@ -526,66 +543,221 @@ gst_data_repo_src_read_tensors (GstDataRepoSrc * src, GstBuffer ** buffer)
       if user sets "tensor-sequence=2,1", datareposrc read offset 9528 then 9488.
     */
 
-    data = info[i].data;
+    data = info.data;
 
     byte_read = 0;
     to_read = src->tensors_size[seq_idx];
     offset = sample_offset + src->tensors_offset[seq_idx];
-    src->offset = lseek (src->fd, offset, SEEK_SET);
+    src->fd_offset = lseek (src->fd, offset, SEEK_SET);
 
     while (to_read > 0) {
       GST_LOG_OBJECT (src,
           "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x (%d size)",
-          to_read, src->offset + byte_read, (guint) src->offset + byte_read);
+          to_read, src->fd_offset + byte_read,
+          (guint) src->fd_offset + byte_read);
       errno = 0;
-      ret = read (src->fd, data + byte_read, to_read);
-      GST_LOG_OBJECT (src, "Read: %d", ret);
-      if (ret < 0) {
+      read_size = read (src->fd, data + byte_read, to_read);
+      GST_LOG_OBJECT (src, "Read: %d", read_size);
+      if (read_size < 0) {
         if (errno == EAGAIN || errno == EINTR)
           continue;
-        goto could_not_read;
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+        ret = GST_FLOW_ERROR;
+        goto error;
       }
       /* files should eos if they read 0 and more was requested */
-      if (ret == 0) {
+      if (read_size == 0) {
         /* .. but first we should return any remaining data */
         if (byte_read > 0)
           break;
-        goto eos;
+        GST_DEBUG ("EOS");
+        ret = GST_FLOW_EOS;
+        goto error;
       }
-      to_read -= ret;
-      byte_read += ret;
+      to_read -= read_size;
+      byte_read += read_size;
 
-      src->read_position += ret;
-      src->offset += ret;
+      src->read_position += read_size;
+      src->fd_offset += read_size;
     }
 
-    if (mem[i])
-      gst_memory_unmap (mem[i], &info[i]);
-
-    gst_buffer_append_memory (buf, mem[i]);
+    gst_memory_unmap (mem, &info);
+    gst_buffer_append_memory (buf, mem);
   }
 
   *buffer = buf;
 
   return GST_FLOW_OK;
 
-could_not_read:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_memory_unmap (mem[0], &info[0]);
-    gst_buffer_unref (buf);
-    return GST_FLOW_ERROR;
+error:
+  if (mem) {
+    gst_memory_unmap (mem, &info);
+    gst_memory_unref (mem);
   }
-eos:
-  {
-    GST_DEBUG ("EOS");
-    gst_memory_unmap (mem[0], &info[0]);
-    gst_buffer_unref (buf);
-    return GST_FLOW_EOS;
+
+  gst_buffer_unref (buf);
+
+  return ret;
+}
+
+/**
+ * @brief Function to get num_tensors from tensor_count_array
+ */
+static guint
+gst_data_repo_src_get_num_tensors (GstDataRepoSrc * src, guint shuffled_index)
+{
+  guint num_tensors = 0;
+  guint cur_idx_tensor_cnt = 0;
+  guint next_idx_tensor_cnt = 0;
+
+  g_return_val_if_fail (src != NULL, 0);
+
+  cur_idx_tensor_cnt =
+      json_array_get_int_element (src->tensor_count_array, shuffled_index);
+  GST_DEBUG_OBJECT (src, "cur_idx_tensor_cnt:%u", cur_idx_tensor_cnt);
+
+  if (shuffled_index + 1 == src->tensor_count_array_len) {
+    next_idx_tensor_cnt = src->tensor_size_array_len;
+  } else {
+    next_idx_tensor_cnt =
+        json_array_get_int_element (src->tensor_count_array,
+        shuffled_index + 1);
+  }
+  GST_DEBUG_OBJECT (src, "next_idx_tensor_cnt:%u", next_idx_tensor_cnt);
+
+  num_tensors = next_idx_tensor_cnt - cur_idx_tensor_cnt;
+  GST_DEBUG_OBJECT (src, "num_tensors:%u", num_tensors);
+
+  return num_tensors;
+}
+
+/**
+ * @brief Function to read tensors
+ */
+static GstFlowReturn
+gst_data_repo_src_read_flexible_tensors (GstDataRepoSrc * src,
+    GstBuffer ** buffer)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  guint i;
+  guint shuffled_index = 0;
+  gint64 sample_offset;
+  guint num_tensors = 0;
+  GstBuffer *buf;
+  GstMemory *mem;
+  GstMapInfo info;
+  GstTensorMetaInfo meta;
+  guint to_read, byte_read;
+  int read_size;
+  guint8 *data;
+  guint tensor_count;
+  guint tensor_size;
+
+  g_return_val_if_fail (src->fd != 0, GST_FLOW_ERROR);
+  g_return_val_if_fail (src->shuffled_index_array != NULL, GST_FLOW_ERROR);
+
+  if (gst_data_repo_src_epoch_is_done (src)) {
+    if (src->epochs == 0) {
+      GST_LOG_OBJECT (src, "send EOS");
+      return GST_FLOW_EOS;
+    }
+    if (src->is_shuffle)
+      gst_data_repo_src_shuffle_samples_index (src);
+  }
+
+  /* only do for first epoch */
+  if (!src->first_epoch_is_done) {
+    /* append samples index to array */
+    g_array_append_val (src->shuffled_index_array, src->current_sample_index);
+    src->current_sample_index++;
+  }
+
+  shuffled_index =
+      g_array_index (src->shuffled_index_array, guint, src->array_index++);
+  GST_LOG_OBJECT (src, "shuffled_index [%d] -> %d", src->array_index - 1,
+      shuffled_index);
+
+  /* sample offset from 0 */
+  sample_offset =
+      json_array_get_int_element (src->sample_offset_array, shuffled_index);
+  GST_LOG_OBJECT (src, "sample offset 0x%" G_GINT64_MODIFIER "x (%d size)",
+      sample_offset, (guint) sample_offset);
+
+  src->fd_offset = lseek (src->fd, sample_offset, SEEK_SET);
+
+  buf = gst_buffer_new ();
+
+  tensor_count =
+      json_array_get_int_element (src->tensor_count_array, shuffled_index);
+  num_tensors = gst_data_repo_src_get_num_tensors (src, shuffled_index);
+
+  for (i = 0; i < num_tensors; i++) {
+    tensor_size =
+        json_array_get_int_element (src->tensor_size_array, tensor_count + i);
+    mem = gst_allocator_alloc (NULL, tensor_size, NULL);
+
+    if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
+      GST_ERROR_OBJECT (src, "Could not map GstMemory[%d]", i);
+      ret = GST_FLOW_ERROR;
+      goto error;
+    }
+
+    data = info.data;
+    byte_read = 0;
+    to_read = tensor_size;
+    while (to_read > 0) {
+      GST_LOG_OBJECT (src,
+          "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x (%lld size)",
+          to_read, src->fd_offset + byte_read, (long long) src->fd_offset);
+      errno = 0;
+      read_size = read (src->fd, data + byte_read, to_read);
+      GST_LOG_OBJECT (src, "Read: %d", read_size);
+      if (read_size < 0) {
+        if (errno == EAGAIN || errno == EINTR)
+          continue;
+        GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+        ret = GST_FLOW_ERROR;
+        goto error;
+      }
+      /* files should eos if they read 0 and more was requested */
+      if (read_size == 0) {
+        /* .. but first we should return any remaining data */
+        if (byte_read > 0)
+          break;
+        GST_DEBUG ("EOS");
+        ret = GST_FLOW_EOS;
+        goto error;
+      }
+      to_read -= read_size;
+      byte_read += read_size;
+
+      src->read_position += read_size;
+      src->fd_offset += read_size;
+    }
+
+    /* check invalid flexible tensor */
+    if (!gst_tensor_meta_info_parse_header (&meta, info.data)) {
+      GST_ERROR_OBJECT (src, "Invalid flexible tensors");
+      ret = GST_FLOW_ERROR;
+      goto error;
+    }
+
+    gst_memory_unmap (mem, &info);
+    gst_buffer_append_memory (buf, mem);
   }
+
+  *buffer = buf;
+
+  return GST_FLOW_OK;
+
 error:
+  if (mem) {
+    gst_memory_unmap (mem, &info);
+    gst_memory_unref (mem);
+  }
   gst_buffer_unref (buf);
-  return GST_FLOW_ERROR;
+
+  return ret;
 }
 
 /**
@@ -631,7 +803,7 @@ gst_data_repo_src_read_multi_images (GstDataRepoSrc * src, GstBuffer ** buffer)
   gchar *data;
   gchar *filename;
   GstBuffer *buf;
-  gboolean ret;
+  gboolean read_size;
   GError *error = NULL;
 
   g_return_val_if_fail (src->shuffled_index_array != NULL, GST_FLOW_ERROR);
@@ -657,8 +829,8 @@ gst_data_repo_src_read_multi_images (GstDataRepoSrc * src, GstBuffer ** buffer)
   src->array_index++;
 
   /* Try to read one image */
-  ret = g_file_get_contents (filename, &data, &size, &error);
-  if (!ret) {
+  read_size = g_file_get_contents (filename, &data, &size, &error);
+  if (!read_size) {
     if (src->successful_read) {
       /* If we've read at least one buffer successfully, not finding the next file is EOS. */
       g_free (filename);
@@ -706,9 +878,10 @@ handle_error:
 static GstFlowReturn
 gst_data_repo_src_read_others (GstDataRepoSrc * src, GstBuffer ** buffer)
 {
+  GstFlowReturn ret = GST_FLOW_OK;
   GstBuffer *buf;
   guint to_read, byte_read;
-  int ret;
+  int read_size;
   guint8 *data;
   GstMemory *mem;
   GstMapInfo info;
@@ -739,13 +912,13 @@ gst_data_repo_src_read_others (GstDataRepoSrc * src, GstBuffer ** buffer)
   GST_LOG_OBJECT (src, "shuffled_index [%d] -> %d", src->array_index - 1,
       shuffled_index);
   offset = gst_data_repo_src_get_file_offset (src, shuffled_index);
-  src->offset = lseek (src->fd, offset, SEEK_SET);
+  src->fd_offset = lseek (src->fd, offset, SEEK_SET);
 
   mem = gst_allocator_alloc (NULL, src->sample_size, NULL);
 
   if (!gst_memory_map (mem, &info, GST_MAP_WRITE)) {
     GST_ERROR_OBJECT (src, "Could not map GstMemory");
-    goto error;
+    return GST_FLOW_ERROR;
   }
 
   data = info.data;
@@ -754,31 +927,34 @@ gst_data_repo_src_read_others (GstDataRepoSrc * src, GstBuffer ** buffer)
   to_read = src->sample_size;
   while (to_read > 0) {
     GST_LOG_OBJECT (src, "Reading %d bytes at offset 0x%" G_GINT64_MODIFIER "x",
-        to_read, src->offset + byte_read);
+        to_read, src->fd_offset + byte_read);
     errno = 0;
-    ret = read (src->fd, data + byte_read, to_read);
-    GST_LOG_OBJECT (src, "Read: %d", ret);
-    if (ret < 0) {
+    read_size = read (src->fd, data + byte_read, to_read);
+    GST_LOG_OBJECT (src, "Read: %d", read_size);
+    if (read_size < 0) {
       if (errno == EAGAIN || errno == EINTR)
         continue;
-      goto could_not_read;
+      GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
+      ret = GST_FLOW_ERROR;
+      goto error;
     }
     /* files should eos if they read 0 and more was requested */
-    if (ret == 0) {
+    if (read_size == 0) {
       /* .. but first we should return any remaining data */
       if (byte_read > 0)
         break;
-      goto eos;
+      GST_DEBUG ("EOS");
+      ret = GST_FLOW_EOS;
+      goto error;
     }
-    to_read -= ret;
-    byte_read += ret;
+    to_read -= read_size;
+    byte_read += read_size;
 
-    src->read_position += ret;
-    src->offset += ret;
+    src->read_position += read_size;
+    src->fd_offset += read_size;
   }
 
-  if (mem)
-    gst_memory_unmap (mem, &info);
+  gst_memory_unmap (mem, &info);
 
   buf = gst_buffer_new ();
   gst_buffer_append_memory (buf, mem);
@@ -786,20 +962,13 @@ gst_data_repo_src_read_others (GstDataRepoSrc * src, GstBuffer ** buffer)
   *buffer = buf;
   return GST_FLOW_OK;
 
-could_not_read:
-  {
-    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
-    gst_memory_unmap (mem, &info);
-    return GST_FLOW_ERROR;
-  }
-eos:
-  {
-    GST_DEBUG ("EOS");
+error:
+  if (mem) {
     gst_memory_unmap (mem, &info);
-    return GST_FLOW_EOS;
+    gst_memory_unref (mem);
   }
-error:
-  return GST_FLOW_ERROR;
+
+  return ret;
 }
 
 /**
@@ -841,6 +1010,10 @@ gst_data_repo_src_start (GstDataRepoSrc * src)
   if (fstat (src->fd, &stat_results) < 0)
     goto no_stat;
 
+  src->file_size = stat_results.st_size;
+  if (src->file_size == 0)
+    goto error_close;
+
   if (S_ISDIR (stat_results.st_mode))
     goto was_directory;
 
@@ -866,9 +1039,9 @@ gst_data_repo_src_start (GstDataRepoSrc * src)
     src->last_offset =
         gst_data_repo_src_get_file_offset (src, src->stop_sample_index);
 
-    src->offset = lseek (src->fd, src->start_offset, SEEK_SET);
+    src->fd_offset = lseek (src->fd, src->start_offset, SEEK_SET);
     GST_LOG_OBJECT (src, "Start file offset 0x%" G_GINT64_MODIFIER "x",
-        src->offset);
+        src->fd_offset);
   }
 
   g_free (filename);
@@ -949,7 +1122,10 @@ gst_data_repo_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
     case GST_DATA_REPO_DATA_OCTET:
       return gst_data_repo_src_read_others (src, buffer);
     case GST_DATA_REPO_DATA_TENSOR:
-      return gst_data_repo_src_read_tensors (src, buffer);
+      if (src->is_flexible_tensors)
+        return gst_data_repo_src_read_flexible_tensors (src, buffer);
+      else
+        return gst_data_repo_src_read_tensors (src, buffer);
     case GST_DATA_REPO_DATA_IMAGE:
       return gst_data_repo_src_read_multi_images (src, buffer);
     default:
@@ -1163,6 +1339,10 @@ gst_data_repo_src_set_caps (GstBaseSrc * basesrc, GstCaps * caps)
 static gboolean
 gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
 {
+  GstStructure *s;
+  const GValue *v;
+  const gchar *format;
+
   g_return_val_if_fail (src != NULL, FALSE);
   g_return_val_if_fail (caps != NULL, FALSE);
 
@@ -1177,6 +1357,11 @@ gst_data_repo_src_get_data_type_and_size (GstDataRepoSrc * src, GstCaps * caps)
       break;
     case GST_DATA_REPO_DATA_TENSOR:
       src->sample_size = gst_data_repo_src_get_tensors_size (src, caps);
+      s = gst_caps_get_structure (caps, 0);
+      v = gst_structure_get_value (s, "format");
+      format = g_value_get_string (v);
+      if (g_strcmp0 (format, "flexible") == 0)
+        src->is_flexible_tensors = TRUE;
       break;
     default:
       break;
@@ -1195,7 +1380,6 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src)
   GError *error = NULL;
   GFile *file;
   gchar *contents;
-  JsonParser *parser;
   JsonNode *root;
   JsonObject *object;
   const gchar *caps_str = NULL;
@@ -1218,13 +1402,15 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src)
     return FALSE;
   }
 
-  parser = json_parser_new ();
-  if (!json_parser_load_from_data (parser, contents, -1, NULL)) {
+
+  src->parser = json_parser_new ();
+
+  if (!json_parser_load_from_data (src->parser, contents, -1, NULL)) {
     GST_ERROR_OBJECT (src, "Failed to load data from %s", src->json_filename);
     goto error;
   }
 
-  root = json_parser_get_root (parser);
+  root = json_parser_get_root (src->parser);
   if (!JSON_NODE_HOLDS_OBJECT (root)) {
     GST_ERROR_OBJECT (src, "it does not contain a JsonObject: %s", contents);
     goto error;
@@ -1263,8 +1449,40 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src)
     GST_INFO_OBJECT (src, "sample_size: %d", src->sample_size);
   }
 
-  if (src->sample_size == 0)
-    goto error;
+  if (src->is_flexible_tensors) {
+    if (!json_object_has_member (object, "sample_offset")) {
+      GST_ERROR_OBJECT (src, "There is not sample_offset field: %s", contents);
+      goto error;
+    }
+    src->sample_offset_array =
+        json_object_get_array_member (object, "sample_offset");
+    src->sample_offset_array_len =
+        json_array_get_length (src->sample_offset_array);
+
+    if (!json_object_has_member (object, "tensor_size")) {
+      GST_ERROR_OBJECT (src, "There is not tensor_size field: %s", contents);
+      goto error;
+    }
+    src->tensor_size_array =
+        json_object_get_array_member (object, "tensor_size");
+    src->tensor_size_array_len = json_array_get_length (src->tensor_size_array);
+    GST_INFO_OBJECT (src, "tensor_size_array_len:%u",
+        src->tensor_size_array_len);
+
+    if (!json_object_has_member (object, "tensor_count")) {
+      GST_ERROR_OBJECT (src, "There is not tensor_count field: %s", contents);
+      goto error;
+    }
+    src->tensor_count_array =
+        json_object_get_array_member (object, "tensor_count");
+    src->tensor_count_array_len =
+        json_array_get_length (src->tensor_count_array);
+    GST_INFO_OBJECT (src, "tensor_count_array_len:%u",
+        src->tensor_count_array_len);
+  } else {
+    if (src->sample_size == 0)
+      goto error;
+  }
 
   if (!json_object_has_member (object, "total_samples")) {
     GST_ERROR_OBJECT (src, "There is not total_samples field: %s", contents);
@@ -1278,16 +1496,13 @@ gst_data_repo_src_read_json_file (GstDataRepoSrc * src)
     goto error;
 
   g_free (contents);
-  g_object_unref (parser);
   g_object_unref (file);
 
   return TRUE;
-
 error:
   src->data_type = GST_DATA_REPO_DATA_UNKNOWN;
   GST_ERROR_OBJECT (src, "Failed to parse %s", src->json_filename);
   g_free (contents);
-  g_object_unref (parser);
   g_object_unref (file);
 
   return FALSE;
@@ -1430,7 +1645,7 @@ gst_data_repo_src_change_state (GstElement * element, GstStateChange transition)
       /** if data_type is not GST_DATA_REPO_DATA_UNKNOWN and sample_size is 0 then
           'caps' is set by property and sample size needs to be set by blocksize
           (in the case of otect and text) */
-      if (src->sample_size == 0) {
+      if (src->sample_size == 0 && !src->is_flexible_tensors) {
         basesrc = GST_BASE_SRC (src);
         g_object_get (G_OBJECT (basesrc), "blocksize", &blocksize, NULL);
         GST_DEBUG_OBJECT (src, "blocksize = %d", blocksize);
index d6c82c2..74077c7 100644 (file)
@@ -16,6 +16,7 @@
 #include <sys/types.h>
 #include <gst/gst.h>
 #include <gst/base/gstpushsrc.h>
+#include <json-glib/json-glib.h>
 #include <tensor_typedef.h>
 #include "gstdatarepo.h"
 
@@ -44,11 +45,13 @@ struct _GstDataRepoSrc {
   GstPushSrc parent;            /**< parent object */
   GstPad *src_pad;
 
+  gboolean is_flexible_tensors;
   gboolean is_start;            /**< check if datareposrc is started */
   gboolean successful_read;     /**< used for checking EOS when reading more than one images(multi-files) from a path */
   gint fd;                      /**< open file descriptor */
+  gint file_size;               /**< file size, in bytes */
   guint64 read_position;        /**< position of fd */
-  guint64 offset;               /**< offset of fd */
+  guint64 fd_offset;            /**< offset of fd */
   guint64 start_offset;         /**< start offset to read */
   guint64 last_offset;          /**< last offset to read */
   guint tensors_size[MAX_ITEM];   /**< each tensors size in a sample */
@@ -77,6 +80,15 @@ struct _GstDataRepoSrc {
   guint tensors_seq_cnt;
   gboolean need_changed_caps;   /**< When tensors-sequence changes, caps need to be changed */
   GstCaps *caps;                /**< optional property, datareposrc should get data format from JSON file caps field */
+
+  /* flexible tensors */
+  JsonArray *sample_offset_array;   /**< offset array of sample */
+  JsonArray *tensor_size_array;     /**< size array of flexible tensor to be stored in a Gstbuffer */
+  JsonArray *tensor_count_array;    /**< array for the number of cumulative tensors */
+  JsonParser *parser;               /**< Keep JSON data after parsing JSON file */
+  guint sample_offset_array_len;
+  guint tensor_size_array_len;
+  guint tensor_count_array_len;
 };
 
 /**
index 15485a2..ea59fbd 100644 (file)
@@ -45,6 +45,7 @@ bus_callback (GstBus *bus, GstMessage *message, gpointer data)
 {
   switch (GST_MESSAGE_TYPE (message)) {
     case GST_MESSAGE_EOS:
+    case GST_MESSAGE_ERROR:
       g_main_loop_quit ((GMainLoop *) data);
       break;
     default:
@@ -55,23 +56,66 @@ bus_callback (GstBus *bus, GstMessage *message, gpointer data)
 }
 
 /**
- * @brief create video test file
+ * @brief Callback for tensor sink signal.
  */
 static void
-create_video_test_file ()
+new_data_cb (GstElement *element, GstBuffer *buffer, gint *user_data)
+{
+  (*user_data)++;
+  g_warning ("count:%d", *user_data);
+  return;
+}
+
+/**
+ * @brief create flexible tensors file
+ */
+static void
+create_flexible_tensors_test_file ()
 {
   GstBus *bus;
   GMainLoop *loop;
+  const gchar *str_pipeline
+      = "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+        "video/x-raw,format=RGB,width=176,height=144,framerate=10/1 ! tensor_converter ! join0.sink_0 "
+        "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+        "video/x-raw,format=RGB,width=320,height=240,framerate=10/1 ! tensor_converter ! join0.sink_1 "
+        "videotestsrc num-buffers=3 ! videoconvert ! videoscale ! "
+        "video/x-raw,format=RGB,width=640,height=480,framerate=10/1 ! tensor_converter ! join0.sink_2 "
+        "join name=join0 ! other/tensors,format=flexible ! "
+        "datareposink location=flexible.data json=flexible.json";
+
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
 
   loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+}
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=10 ! "
-                                  "datareposink location=video1.raw json=video1.json");
+/**
+ * @brief create video test file
+ */
+static void
+create_video_test_file ()
+{
+  GstBus *bus;
+  GMainLoop *loop;
+  const gchar *str_pipeline = "videotestsrc num-buffers=10 ! "
+                              "datareposink location=video1.raw json=video1.json";
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
+  loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
   gst_bus_add_watch (bus, bus_callback, loop);
@@ -93,16 +137,13 @@ create_audio_test_file ()
 {
   GstBus *bus;
   GMainLoop *loop;
-
-  loop = g_main_loop_new (NULL, FALSE);
-
-  gchar *str_pipeline = g_strdup (
-      "gst-launch-1.0 audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
-      "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
-      "datareposink location=audio1.raw json=audio1.json");
+  const gchar *str_pipeline
+      = "audiotestsrc samplesperbuffer=44100 num-buffers=1 ! "
+        "audio/x-raw, format=S16LE, layout=interleaved, rate=44100, channels=1 ! "
+        "datareposink location=audio1.raw json=audio1.json";
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
+  loop = g_main_loop_new (NULL, FALSE);
   ASSERT_NE (pipeline, nullptr);
 
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
@@ -126,16 +167,13 @@ create_image_test_file ()
 {
   GstBus *bus;
   GMainLoop *loop;
-
-  loop = g_main_loop_new (NULL, FALSE);
-
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 videotestsrc num-buffers=5 ! pngenc ! "
-                                  "datareposink location=img_%02d.png json=img.json");
+  const gchar *str_pipeline = "videotestsrc num-buffers=5 ! pngenc ! "
+                              "datareposink location=img_%02d.png json=img.json";
 
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
+  loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
   gst_bus_add_watch (bus, bus_callback, loop);
@@ -154,32 +192,49 @@ create_image_test_file ()
  */
 TEST (datareposrc, readImageFiles)
 {
+  gchar *filename;
+  gint buffer_count = 0, i;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  GstElement *tensor_sink;
   GstBus *bus;
   GMainLoop *loop;
+  const gchar *str_pipeline
+      = "datareposrc location=img_%02d.png json=img.json start-sample-index=0 stop-sample-index=4 !"
+        "pngdec ! tensor_converter ! tensor_sink name=tensor_sink0";
 
   create_image_test_file ();
 
-  loop = g_main_loop_new (NULL, FALSE);
-
-  gchar *str_pipeline = g_strdup (
-      "gst-launch-1.0 datareposrc location=img_%02d.png json=img.json "
-      "start-sample-index=0 stop-sample-index=4 ! pngdec ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
   gst_bus_add_watch (bus, bus_callback, loop);
   gst_object_unref (bus);
 
-  EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
   g_main_loop_run (loop);
 
   setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+  EXPECT_NE (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
+
+  for (i = 0; i < 5; i++) {
+    filename = g_strdup_printf ("img_%02d.png", i);
+    g_remove (filename);
+    g_free (filename);
+  }
 }
 
 /**
@@ -187,31 +242,43 @@ TEST (datareposrc, readImageFiles)
  */
 TEST (datareposrc, readVideoRaw)
 {
+  gint buffer_count = 0;
+  GstElement *tensor_sink;
   GstBus *bus;
   GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  const gchar *str_pipeline
+      = "datareposrc location=video1.raw json=video1.json ! tensor_converter ! tensor_sink name=tensor_sink0";
 
   create_video_test_file ();
 
-  loop = g_main_loop_new (NULL, FALSE);
-
-  gchar *str_pipeline = g_strdup (
-      "gst-launch-1.0 datareposrc location=video1.raw json=video1.json ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
   gst_bus_add_watch (bus, bus_callback, loop);
   gst_object_unref (bus);
 
-  EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
   g_main_loop_run (loop);
 
   setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  EXPECT_NE (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
+
+  g_remove ("video1.json");
+  g_remove ("video1.raw");
 }
 
 /**
@@ -219,31 +286,44 @@ TEST (datareposrc, readVideoRaw)
  */
 TEST (datareposrc, readAudioRaw)
 {
+  gint buffer_count = 0;
+  GstElement *tensor_sink;
   GstBus *bus;
   GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  const gchar *str_pipeline
+      = "datareposrc location=audio1.raw json=audio1.json ! tensor_converter ! tensor_sink name=tensor_sink0";
 
   create_audio_test_file ();
 
-  loop = g_main_loop_new (NULL, FALSE);
-
-  gchar *str_pipeline = g_strdup (
-      "gst-launch-1.0 datareposrc location=audio1.raw json=audio1.json ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
   ASSERT_NE (bus, nullptr);
   gst_bus_add_watch (bus, bus_callback, loop);
   gst_object_unref (bus);
 
-  EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT), 0);
-
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
   g_main_loop_run (loop);
 
   setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+  EXPECT_NE (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
+
+  g_remove ("audio1.json");
+  g_remove ("audio1.raw");
 }
 
 /**
@@ -252,10 +332,9 @@ TEST (datareposrc, readAudioRaw)
 TEST (datareposrc, invalidJsonPath0_n)
 {
   GstElement *datareposrc = NULL;
+  const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
@@ -277,10 +356,9 @@ TEST (datareposrc, invalidJsonPath0_n)
 TEST (datareposrc, invalidJsonPath1_n)
 {
   GstElement *datareposrc = NULL;
+  const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
@@ -304,10 +382,9 @@ TEST (datareposrc, invalidJsonPath1_n)
 TEST (datareposrc, invalidFilePath0_n)
 {
   GstElement *datareposrc = NULL;
+  const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
@@ -329,10 +406,9 @@ TEST (datareposrc, invalidFilePath0_n)
 TEST (datareposrc, invalidFilePath1_n)
 {
   GstElement *datareposrc = NULL;
+  const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
@@ -354,10 +430,9 @@ TEST (datareposrc, invalidFilePath1_n)
 TEST (datareposrc, invalidCapsWithoutJSON_n)
 {
   GstElement *datareposrc = NULL;
+  const gchar *str_pipeline = "datareposrc name=datareposrc ! fakesink";
 
-  gchar *str_pipeline = g_strdup ("gst-launch-1.0 datareposrc name=datareposrc ! fakesink");
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
-  g_free (str_pipeline);
   ASSERT_NE (pipeline, nullptr);
 
   datareposrc = gst_bin_get_by_name (GST_BIN (pipeline), "datareposrc");
@@ -396,10 +471,9 @@ TEST (datareposrc, readTensors)
   file_path = get_file_path (filename);
   json_path = get_file_path (json);
 
-  gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
-      "start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! "
-      "fakesink",
+  gchar *str_pipeline = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+                                         "start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! "
+                                         "fakesink",
       file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   g_free (str_pipeline);
@@ -429,7 +503,7 @@ TEST (datareposrc, readTensors)
 
   g_main_loop_run (loop);
 
-  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
 
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
@@ -438,6 +512,50 @@ TEST (datareposrc, readTensors)
 }
 
 /**
+ * @brief Test for reading a file composed of flexible tensors
+ * the default shuffle is TRUE.
+ */
+TEST (datareposrc, readFlexibleTensors)
+{
+  gint buffer_count = 0;
+  GstElement *tensor_sink;
+  GstBus *bus;
+  const gchar *str_pipeline = NULL;
+  GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  str_pipeline = "datareposrc location=flexible.data json=flexible.json ! tensor_sink name=tensor_sink0";
+
+  create_flexible_tensors_test_file ();
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  EXPECT_NE (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
+
+  g_remove ("flexible.json");
+  g_remove ("flexible.data");
+}
+
+/**
  * @brief Test for reading a tensors file with Caps property
  */
 TEST (datareposrc, readTensorsNoJSONWithCapsParam)
@@ -455,7 +573,7 @@ TEST (datareposrc, readTensorsNoJSONWithCapsParam)
   file_path = get_file_path (filename);
 
   gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s "
+      "datareposrc name=datareposrc location=%s "
       "start-sample-index=0 stop-sample-index=9 epochs=2 tensors-sequence=0,1 "
       "caps =\"other/tensors, format=(string)static, framerate=(fraction)0/1, "
       "num_tensors=(int)2, dimensions=(string)1:1:784:1.1:1:10:1, types=(string)float32.float32\" ! "
@@ -486,7 +604,7 @@ TEST (datareposrc, readTensorsNoJSONWithCapsParam)
 
   g_main_loop_run (loop);
 
-  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+  EXPECT_EQ (setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT), 0);
 
   gst_object_unref (pipeline);
   g_main_loop_unref (loop);
@@ -508,10 +626,10 @@ TEST (datareposrc, invalidStartSampleIndex0_n)
   file_path = get_file_path (filename);
   json_path = get_file_path (json);
 
-  gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
-      "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
-      file_path, json_path);
+  gchar *str_pipeline
+      = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+                         "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
+          file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   g_free (str_pipeline);
   g_free (file_path);
@@ -545,10 +663,10 @@ TEST (datareposrc, invalidStartSampleIndex1_n)
   file_path = get_file_path (filename);
   json_path = get_file_path (json);
 
-  gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
-      "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
-      file_path, json_path);
+  gchar *str_pipeline
+      = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+                         "stop-sample-index=9 epochs=2 tensors-sequence=0,1 ! fakesink",
+          file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   g_free (str_pipeline);
   g_free (file_path);
@@ -582,10 +700,10 @@ TEST (datareposrc, invalidStopSampleIndex0_n)
   file_path = get_file_path (filename);
   json_path = get_file_path (json);
 
-  gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
-      "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
-      file_path, json_path);
+  gchar *str_pipeline
+      = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+                         "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
+          file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   g_free (str_pipeline);
   g_free (file_path);
@@ -618,10 +736,10 @@ TEST (datareposrc, invalidStopSampleIndex1_n)
   file_path = get_file_path (filename);
   json_path = get_file_path (json);
 
-  gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
-      "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
-      file_path, json_path);
+  gchar *str_pipeline
+      = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
+                         "start-sample-index=0 epochs=2 tensors-sequence=0,1 ! fakesink",
+          file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
   g_free (str_pipeline);
   g_free (file_path);
@@ -656,7 +774,7 @@ TEST (datareposrc, invalidEpochs0_n)
   json_path = get_file_path (json);
 
   gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+      "datareposrc name=datareposrc location=%s json=%s "
       "start-sample-index=0 stop-sample-index=9 tensors-sequence=0,1 ! fakesink",
       file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
@@ -692,7 +810,7 @@ TEST (datareposrc, invalidEpochs1_n)
   json_path = get_file_path (json);
 
   gchar *str_pipeline = g_strdup_printf (
-      "gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+      "datareposrc name=datareposrc location=%s json=%s "
       "start-sample-index=0 stop-sample-index=9 tensors-sequence=0,1 ! fakesink",
       file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
@@ -727,7 +845,7 @@ TEST (datareposrc, invalidTensorsSequence0_n)
   json_path = get_file_path (json);
 
   gchar *str_pipeline
-      = g_strdup_printf ("gst-launch-1.0 datareposrc name=datareposrc location=%s json=%s "
+      = g_strdup_printf ("datareposrc name=datareposrc location=%s json=%s "
                          "start-sample-index=0 stop-sample-index=9 ! fakesink",
           file_path, json_path);
   GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
@@ -749,25 +867,54 @@ TEST (datareposrc, invalidTensorsSequence0_n)
 }
 
 /**
- * @brief remove test file
+ * @brief Test for reading a file composed of non-flexible tensors
+ * the default shuffle is TRUE.
  */
-static void
-remove_test_file (void)
+TEST (datareposrc, readInvalidFlexibleTensors)
 {
-  gchar *filename = NULL;
-  int i;
+  gint buffer_count = 0;
+  GstBus *bus;
+  GMainLoop *loop;
+  GCallback handler = G_CALLBACK (new_data_cb);
+  const gchar *str_pipeline
+      = "datareposrc location=audio1.raw json=flexible.json ! tensor_sink name=tensor_sink0";
+  GstElement *tensor_sink;
+
+  create_flexible_tensors_test_file ();
+  create_audio_test_file ();
+
+  GstElement *pipeline = gst_parse_launch (str_pipeline, NULL);
+  ASSERT_NE (pipeline, nullptr);
+
+  tensor_sink = gst_bin_get_by_name (GST_BIN (pipeline), "tensor_sink0");
+  ASSERT_NE (tensor_sink, nullptr);
+
+  g_signal_connect (tensor_sink, "new-data", (GCallback) handler, &buffer_count);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
+  ASSERT_NE (bus, nullptr);
+  gst_bus_add_watch (bus, bus_callback, loop);
+  gst_object_unref (bus);
+
+  /* EXPECT_EQ not checked due to internal data stream error */
+  setPipelineStateSync (pipeline, GST_STATE_PLAYING, UNITTEST_STATECHANGE_TIMEOUT);
+  g_main_loop_run (loop);
+
+  setPipelineStateSync (pipeline, GST_STATE_NULL, UNITTEST_STATECHANGE_TIMEOUT);
+
+  /* Internal data stream error */
+  EXPECT_EQ (buffer_count, 0);
+  handler = NULL;
+
+  gst_object_unref (tensor_sink);
+  gst_object_unref (pipeline);
+  g_main_loop_unref (loop);
 
   g_remove ("audio1.json");
   g_remove ("audio1.raw");
-  g_remove ("video1.json");
-  g_remove ("video1.raw");
-  g_remove ("img.son");
-
-  for (i = 0; i < 5; i++) {
-    filename = g_strdup_printf ("img_%02d.png", i);
-    g_remove (filename);
-    g_free (filename);
-  }
+  g_remove ("flexible.json");
+  g_remove ("flexible.data");
 }
 
 /**
@@ -792,7 +939,5 @@ main (int argc, char **argv)
     g_warning ("catch `testing::internal::GoogleTestFailureException`");
   }
 
-  remove_test_file ();
-
   return result;
 }