[C-Api] handle flexible tensor
authorJaeyun <jy1210.jung@samsung.com>
Fri, 23 Apr 2021 11:17:35 +0000 (20:17 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Fri, 25 Jun 2021 07:31:34 +0000 (16:31 +0900)
In flexible tensor stream, sink and src handle cannot set exact tensor info from pad caps.
To handle flex-tensor, ignore tensor info while configuring the element handle.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
c/include/nnstreamer-capi-private.h
c/src/nnstreamer-capi-pipeline.c
tests/capi/unittest_capi_inference.cc

index da4f484..22b8be2 100644 (file)
@@ -256,6 +256,7 @@ typedef struct _ml_pipeline_element {
 
   GMutex lock; /**< Lock for internal values */
   gboolean is_media_stream;
+  gboolean is_flexible_tensor;
 
   ml_handle_destroy_cb custom_destroy;
   gpointer custom_data;
index 6955935..040ffe7 100644 (file)
@@ -230,6 +230,7 @@ construct_element (GstElement * e, ml_pipeline * p, const char *name,
   ret->maxid = 0;
   ret->handle_id = 0;
   ret->is_media_stream = FALSE;
+  ret->is_flexible_tensor = FALSE;
   g_mutex_init (&ret->lock);
   return ret;
 }
@@ -238,7 +239,8 @@ construct_element (GstElement * e, ml_pipeline * p, const char *name,
  * @brief Internal function to get the tensors info from the element caps.
  */
 static gboolean
-get_tensors_info_from_caps (GstCaps * caps, ml_tensors_info_s * info)
+get_tensors_info_from_caps (GstCaps * caps, ml_tensors_info_s * info,
+    gboolean * is_flexible)
 {
   GstStructure *s;
   GstTensorsConfig config;
@@ -254,6 +256,7 @@ get_tensors_info_from_caps (GstCaps * caps, ml_tensors_info_s * info)
 
     if (found) {
       ml_tensors_info_copy_from_gst (info, &config.info);
+      *is_flexible = gst_tensors_info_is_flexible (&config.info);
       break;
     }
   }
@@ -277,6 +280,7 @@ cb_sink_event (GstElement * e, GstBuffer * b, gpointer user_data)
   GList *l;
   ml_tensors_data_s *_data = NULL;
   ml_tensors_info_s *_info;
+  ml_tensors_info_s info_flex_tensor;
   size_t total_size = 0;
   int status;
 
@@ -324,14 +328,20 @@ cb_sink_event (GstElement * e, GstBuffer * b, gpointer user_data)
       GstCaps *caps = gst_pad_get_current_caps (elem->sink);
 
       if (caps) {
-        gboolean found;
+        gboolean flexible = FALSE;
+        gboolean found = get_tensors_info_from_caps (caps, _info, &flexible);
 
-        found = get_tensors_info_from_caps (caps, _info);
         gst_caps_unref (caps);
 
         if (found) {
           elem->size = 0;
 
+          /* cannot get exact info from caps */
+          if (flexible) {
+            elem->is_flexible_tensor = TRUE;
+            goto send_cb;
+          }
+
           if (_info->num_tensors != num_mems) {
             ml_loge
                 ("The sink event of [%s] cannot be handled because the number of tensors mismatches.",
@@ -380,6 +390,31 @@ cb_sink_event (GstElement * e, GstBuffer * b, gpointer user_data)
     goto error;
   }
 
+send_cb:
+  /* set info for flexible stream */
+  if (elem->is_flexible_tensor) {
+    GstTensorMetaInfo meta;
+    GstTensorsInfo gst_info;
+    gsize hsize;
+
+    gst_tensors_info_init (&gst_info);
+    gst_info.num_tensors = num_mems;
+    _info = &info_flex_tensor;
+
+    /* handle header for flex tensor */
+    for (i = 0; i < num_mems; i++) {
+      gst_tensor_meta_info_parse_header (&meta, map[i].data);
+      hsize = gst_tensor_meta_info_get_header_size (&meta);
+
+      gst_tensor_meta_info_convert (&meta, &gst_info.info[i]);
+
+      _data->tensors[i].tensor = map[i].data + hsize;
+      _data->tensors[i].size = map[i].size - hsize;
+    }
+
+    ml_tensors_info_copy_from_gst (_info, &gst_info);
+  }
+
   /* Iterate e->handles, pass the data to them */
   for (l = elem->handles; l != NULL; l = l->next) {
     ml_pipeline_sink_cb callback;
@@ -1278,35 +1313,33 @@ ml_pipeline_src_parse_tensors_info (ml_pipeline_element * elem)
     } else {
       GstCaps *caps = gst_pad_get_allowed_caps (elem->src);
       guint i;
-      gboolean found = FALSE;
+      gboolean found, flexible;
       size_t sz;
 
+      found = flexible = FALSE;
       if (caps) {
-        found = get_tensors_info_from_caps (caps, _info);
+        found = get_tensors_info_from_caps (caps, _info, &flexible);
 
         if (!found && gst_caps_is_fixed (caps)) {
-          GstStructure *caps_s;
-          const gchar *mimetype;
-
-          caps_s = gst_caps_get_structure (caps, 0);
-          mimetype = gst_structure_get_name (caps_s);
+          GstStructure *st = gst_caps_get_structure (caps, 0);
 
-          if (!g_str_equal (mimetype, "other/tensor") &&
-              !g_str_equal (mimetype, "other/tensors")) {
+          if (!gst_structure_is_tensor_stream (st))
             elem->is_media_stream = TRUE;
-          }
+        } else if (found && flexible) {
+          /* flexible tensor, cannot get exact info from caps. */
+          elem->is_flexible_tensor = TRUE;
         }
 
         gst_caps_unref (caps);
       }
 
-      if (found) {
+      if (found && !flexible) {
         for (i = 0; i < _info->num_tensors; i++) {
           sz = ml_tensor_info_get_size (&_info->info[i]);
           elem->size += sz;
         }
       } else {
-        if (!elem->is_media_stream) {
+        if (!elem->is_media_stream && !elem->is_flexible_tensor) {
           ml_logw
               ("Cannot find caps. The pipeline is not yet negotiated for src element [%s].",
               elem->name);
@@ -1417,10 +1450,11 @@ ml_pipeline_src_input_data (ml_pipeline_src_h h, ml_tensors_data_h data,
     ml_pipeline_buf_policy_e policy)
 {
   GstBuffer *buffer;
-  GstMemory *mem;
+  GstMemory *mem, *tmp;
   gpointer mem_data;
   gsize mem_size;
   GstFlowReturn gret;
+  GstTensorsInfo gst_info;
   ml_tensors_data_s *_data;
   unsigned int i;
 
@@ -1449,7 +1483,7 @@ ml_pipeline_src_input_data (ml_pipeline_src_h h, ml_tensors_data_h data,
     goto dont_destroy_data;
   }
 
-  if (!elem->is_media_stream) {
+  if (!elem->is_media_stream && !elem->is_flexible_tensor) {
     if (elem->tensors_info.num_tensors != _data->num_tensors) {
       ml_loge
           ("The src push of [%s] cannot be handled because the number of tensors in a frame mismatches. %u != %u",
@@ -1475,18 +1509,32 @@ ml_pipeline_src_input_data (ml_pipeline_src_h h, ml_tensors_data_h data,
 
   /* Create buffer to be pushed from buf[] */
   buffer = gst_buffer_new ();
+  ml_tensors_info_copy_from_ml (&gst_info, _data->info);
+
   for (i = 0; i < _data->num_tensors; i++) {
     mem_data = _data->tensors[i].tensor;
     mem_size = _data->tensors[i].size;
 
-    mem = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
+    mem = tmp = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
         mem_data, mem_size, 0, mem_size, mem_data,
         (policy == ML_PIPELINE_BUF_POLICY_AUTO_FREE) ? g_free : NULL);
 
+    /* flex tensor, append header. */
+    if (elem->is_flexible_tensor) {
+      GstTensorMetaInfo meta;
+
+      gst_tensor_info_convert_to_meta (&gst_info.info[i], &meta);
+
+      mem = gst_tensor_meta_info_append_header (&meta, tmp);
+      gst_memory_unref (tmp);
+    }
+
     gst_buffer_append_memory (buffer, mem);
     /** @todo Verify that gst_buffer_append lists tensors/gstmem in the correct order */
   }
 
+  gst_tensors_info_free (&gst_info);
+
   /* Unlock if it's not auto-free. We do not know when it'll be freed. */
   if (policy != ML_PIPELINE_BUF_POLICY_AUTO_FREE)
     G_UNLOCK_UNLESS_NOLOCK (*_data);
index 78c406c..d7e8584 100644 (file)
@@ -10564,6 +10564,189 @@ TEST (nnstreamer_capi_flush, failure_02_n)
 }
 
 /**
+ * @brief A tensor-sink callback for sink handle in a pipeline
+ */
+static void
+test_sink_callback_flex (const ml_tensors_data_h data,
+    const ml_tensors_info_h info, void *user_data)
+{
+  guint *count = (guint *) user_data;
+  gint status;
+  gint *received;
+  guint total = 0;
+  size_t data_size;
+
+  G_LOCK (callback_lock);
+  *count = *count + 1;
+
+  status = ml_tensors_info_get_count (info, &total);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+  EXPECT_EQ (total, 3U);
+
+  status = ml_tensors_data_get_tensor_data (data, 0, (void **) &received, &data_size);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+  EXPECT_EQ (data_size, 4 * sizeof (gint));
+  EXPECT_EQ (received[0], 1);
+  EXPECT_EQ (received[1], 2);
+  EXPECT_EQ (received[2], 3);
+  EXPECT_EQ (received[3], 4);
+
+  status = ml_tensors_data_get_tensor_data (data, 1, (void **) &received, &data_size);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+  EXPECT_EQ (data_size, 2 * sizeof (gint));
+  EXPECT_EQ (received[0], 5);
+  EXPECT_EQ (received[1], 6);
+
+  status = ml_tensors_data_get_tensor_data (data, 2, (void **) &received, &data_size);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+  EXPECT_EQ (data_size, 4 * sizeof (gint));
+  EXPECT_EQ (received[0], 7);
+  EXPECT_EQ (received[1], 8);
+  EXPECT_EQ (received[2], 9);
+  EXPECT_EQ (received[3], 10);
+
+  G_UNLOCK (callback_lock);
+}
+
+/**
+ * @brief Test NNStreamer pipeline for flexible tensors.
+ */
+TEST (nnstreamer_capi_flex, sink_multi)
+{
+  gchar pipeline[] = "appsrc name=srcx caps=application/octet-stream,framerate=(fraction)10/1 ! "
+      "tensor_converter input-dim=4,2,4 input-type=int32,int32,int32 ! "
+      "other/tensors-flexible ! tensor_sink name=sinkx sync=false";
+  guint test_data[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+  ml_pipeline_h handle;
+  ml_pipeline_src_h srchandle;
+  ml_pipeline_sink_h sinkhandle;
+  ml_tensors_info_h in_info;
+  ml_tensors_data_h in_data;
+  ml_tensor_dimension dim = { 10, 1, 1, 1 };
+  gint i, status;
+  guint *count_sink;
+
+  count_sink = (guint *) g_malloc0 (sizeof (guint));
+  ASSERT_TRUE (count_sink != NULL);
+
+  /* prepare input data */
+  ml_tensors_info_create (&in_info);
+  ml_tensors_info_set_count (in_info, 1);
+  ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_INT32);
+  ml_tensors_info_set_tensor_dimension (in_info, 0, dim);
+
+  ml_tensors_data_create (in_info, &in_data);
+  ml_tensors_data_set_tensor_data (in_data, 0, test_data, 10 * sizeof (gint));
+
+  /* start pipeline */
+  status = ml_pipeline_construct (pipeline, NULL, NULL, &handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_src_get_handle (handle, "srcx", &srchandle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_sink_register (handle, "sinkx",
+      test_sink_callback_flex, count_sink, &sinkhandle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_start (handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  /* push input data */
+  *count_sink = 0;
+  for (i = 0; i < 3; i++) {
+    g_usleep (50000);
+    status = ml_pipeline_src_input_data (srchandle, in_data,
+          ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
+    EXPECT_EQ (status, ML_ERROR_NONE);
+  }
+
+  wait_pipeline_process_buffers (*count_sink, 3);
+  g_usleep (300000);
+  EXPECT_EQ (*count_sink, 3U);
+
+  status = ml_pipeline_destroy (handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  ml_tensors_info_destroy (in_info);
+  ml_tensors_data_destroy (in_data);
+  g_free (count_sink);
+}
+
+/**
+ * @brief Test NNStreamer pipeline for flexible tensors.
+ */
+TEST (nnstreamer_capi_flex, src_multi)
+{
+  gchar pipeline[] = "appsrc name=srcx caps=other/tensors-flexible,framerate=(fraction)10/1 ! "
+      "tensor_converter input-dim=4,2,4 input-type=int32,int32,int32 ! "
+      "tensor_sink name=sinkx sync=false";
+  guint test_data[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+  ml_pipeline_h handle;
+  ml_pipeline_src_h srchandle;
+  ml_pipeline_sink_h sinkhandle;
+  ml_tensors_info_h in_info;
+  ml_tensors_data_h in_data;
+  ml_tensor_dimension dim1 = { 4, 1, 1, 1 };
+  ml_tensor_dimension dim2 = { 2, 1, 1, 1 };
+  ml_tensor_dimension dim3 = { 4, 1, 1, 1 };
+  gint i, status;
+  guint *count_sink;
+
+  count_sink = (guint *) g_malloc0 (sizeof (guint));
+  ASSERT_TRUE (count_sink != NULL);
+
+  /* prepare input data */
+  ml_tensors_info_create (&in_info);
+  ml_tensors_info_set_count (in_info, 3);
+  ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_INT32);
+  ml_tensors_info_set_tensor_dimension (in_info, 0, dim1);
+  ml_tensors_info_set_tensor_type (in_info, 1, ML_TENSOR_TYPE_INT32);
+  ml_tensors_info_set_tensor_dimension (in_info, 1, dim2);
+  ml_tensors_info_set_tensor_type (in_info, 2, ML_TENSOR_TYPE_INT32);
+  ml_tensors_info_set_tensor_dimension (in_info, 2, dim3);
+
+  ml_tensors_data_create (in_info, &in_data);
+  ml_tensors_data_set_tensor_data (in_data, 0, &test_data[0], 4 * sizeof (gint));
+  ml_tensors_data_set_tensor_data (in_data, 1, &test_data[4], 2 * sizeof (gint));
+  ml_tensors_data_set_tensor_data (in_data, 2, &test_data[6], 4 * sizeof (gint));
+
+  /* start pipeline */
+  status = ml_pipeline_construct (pipeline, NULL, NULL, &handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_src_get_handle (handle, "srcx", &srchandle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_sink_register (handle, "sinkx",
+      test_sink_callback_flex, count_sink, &sinkhandle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_pipeline_start (handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  /* push input data */
+  *count_sink = 0;
+  for (i = 0; i < 3; i++) {
+    g_usleep (50000);
+    status = ml_pipeline_src_input_data (srchandle, in_data,
+          ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
+    EXPECT_EQ (status, ML_ERROR_NONE);
+  }
+
+  wait_pipeline_process_buffers (*count_sink, 3);
+  g_usleep (300000);
+  EXPECT_EQ (*count_sink, 3U);
+
+  status = ml_pipeline_destroy (handle);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  ml_tensors_info_destroy (in_info);
+  ml_tensors_data_destroy (in_data);
+  g_free (count_sink);
+}
+
+/**
  * @brief Main gtest
  */
 int