ret->maxid = 0;
ret->handle_id = 0;
ret->is_media_stream = FALSE;
+ ret->is_flexible_tensor = FALSE;
g_mutex_init (&ret->lock);
return ret;
}
* @brief Internal function to get the tensors info from the element caps.
*/
static gboolean
-get_tensors_info_from_caps (GstCaps * caps, ml_tensors_info_s * info)
+get_tensors_info_from_caps (GstCaps * caps, ml_tensors_info_s * info,
+ gboolean * is_flexible)
{
GstStructure *s;
GstTensorsConfig config;
if (found) {
ml_tensors_info_copy_from_gst (info, &config.info);
+ *is_flexible = gst_tensors_info_is_flexible (&config.info);
break;
}
}
GList *l;
ml_tensors_data_s *_data = NULL;
ml_tensors_info_s *_info;
+ ml_tensors_info_s info_flex_tensor;
size_t total_size = 0;
int status;
GstCaps *caps = gst_pad_get_current_caps (elem->sink);
if (caps) {
- gboolean found;
+ gboolean flexible = FALSE;
+ gboolean found = get_tensors_info_from_caps (caps, _info, &flexible);
- found = get_tensors_info_from_caps (caps, _info);
gst_caps_unref (caps);
if (found) {
elem->size = 0;
+ /* cannot get exact info from caps */
+ if (flexible) {
+ elem->is_flexible_tensor = TRUE;
+ goto send_cb;
+ }
+
if (_info->num_tensors != num_mems) {
ml_loge
("The sink event of [%s] cannot be handled because the number of tensors mismatches.",
goto error;
}
+send_cb:
+ /* set info for flexible stream */
+ if (elem->is_flexible_tensor) {
+ GstTensorMetaInfo meta;
+ GstTensorsInfo gst_info;
+ gsize hsize;
+
+ gst_tensors_info_init (&gst_info);
+ gst_info.num_tensors = num_mems;
+ _info = &info_flex_tensor;
+
+ /* handle header for flex tensor */
+ for (i = 0; i < num_mems; i++) {
+ gst_tensor_meta_info_parse_header (&meta, map[i].data);
+ hsize = gst_tensor_meta_info_get_header_size (&meta);
+
+ gst_tensor_meta_info_convert (&meta, &gst_info.info[i]);
+
+ _data->tensors[i].tensor = map[i].data + hsize;
+ _data->tensors[i].size = map[i].size - hsize;
+ }
+
+ ml_tensors_info_copy_from_gst (_info, &gst_info);
+ }
+
/* Iterate e->handles, pass the data to them */
for (l = elem->handles; l != NULL; l = l->next) {
ml_pipeline_sink_cb callback;
} else {
GstCaps *caps = gst_pad_get_allowed_caps (elem->src);
guint i;
- gboolean found = FALSE;
+ gboolean found, flexible;
size_t sz;
+ found = flexible = FALSE;
if (caps) {
- found = get_tensors_info_from_caps (caps, _info);
+ found = get_tensors_info_from_caps (caps, _info, &flexible);
if (!found && gst_caps_is_fixed (caps)) {
- GstStructure *caps_s;
- const gchar *mimetype;
-
- caps_s = gst_caps_get_structure (caps, 0);
- mimetype = gst_structure_get_name (caps_s);
+ GstStructure *st = gst_caps_get_structure (caps, 0);
- if (!g_str_equal (mimetype, "other/tensor") &&
- !g_str_equal (mimetype, "other/tensors")) {
+ if (!gst_structure_is_tensor_stream (st))
elem->is_media_stream = TRUE;
- }
+ } else if (found && flexible) {
+ /* flexible tensor, cannot get exact info from caps. */
+ elem->is_flexible_tensor = TRUE;
}
gst_caps_unref (caps);
}
- if (found) {
+ if (found && !flexible) {
for (i = 0; i < _info->num_tensors; i++) {
sz = ml_tensor_info_get_size (&_info->info[i]);
elem->size += sz;
}
} else {
- if (!elem->is_media_stream) {
+ if (!elem->is_media_stream && !elem->is_flexible_tensor) {
ml_logw
("Cannot find caps. The pipeline is not yet negotiated for src element [%s].",
elem->name);
ml_pipeline_buf_policy_e policy)
{
GstBuffer *buffer;
- GstMemory *mem;
+ GstMemory *mem, *tmp;
gpointer mem_data;
gsize mem_size;
GstFlowReturn gret;
+ GstTensorsInfo gst_info;
ml_tensors_data_s *_data;
unsigned int i;
goto dont_destroy_data;
}
- if (!elem->is_media_stream) {
+ if (!elem->is_media_stream && !elem->is_flexible_tensor) {
if (elem->tensors_info.num_tensors != _data->num_tensors) {
ml_loge
("The src push of [%s] cannot be handled because the number of tensors in a frame mismatches. %u != %u",
/* Create buffer to be pushed from buf[] */
buffer = gst_buffer_new ();
+ ml_tensors_info_copy_from_ml (&gst_info, _data->info);
+
for (i = 0; i < _data->num_tensors; i++) {
mem_data = _data->tensors[i].tensor;
mem_size = _data->tensors[i].size;
- mem = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
+ mem = tmp = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY,
mem_data, mem_size, 0, mem_size, mem_data,
(policy == ML_PIPELINE_BUF_POLICY_AUTO_FREE) ? g_free : NULL);
+ /* flex tensor, append header. */
+ if (elem->is_flexible_tensor) {
+ GstTensorMetaInfo meta;
+
+ gst_tensor_info_convert_to_meta (&gst_info.info[i], &meta);
+
+ mem = gst_tensor_meta_info_append_header (&meta, tmp);
+ gst_memory_unref (tmp);
+ }
+
gst_buffer_append_memory (buffer, mem);
/** @todo Verify that gst_buffer_append lists tensors/gstmem in the correct order */
}
+ gst_tensors_info_free (&gst_info);
+
/* Unlock if it's not auto-free. We do not know when it'll be freed. */
if (policy != ML_PIPELINE_BUF_POLICY_AUTO_FREE)
G_UNLOCK_UNLESS_NOLOCK (*_data);
}
/**
+ * @brief A tensor-sink callback for sink handle in a pipeline
+ */
+static void
+test_sink_callback_flex (const ml_tensors_data_h data,
+ const ml_tensors_info_h info, void *user_data)
+{
+ guint *count = (guint *) user_data;
+ gint status;
+ gint *received;
+ guint total = 0;
+ size_t data_size;
+
+ G_LOCK (callback_lock);
+ *count = *count + 1;
+
+ status = ml_tensors_info_get_count (info, &total);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ EXPECT_EQ (total, 3U);
+
+ status = ml_tensors_data_get_tensor_data (data, 0, (void **) &received, &data_size);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ EXPECT_EQ (data_size, 4 * sizeof (gint));
+ EXPECT_EQ (received[0], 1);
+ EXPECT_EQ (received[1], 2);
+ EXPECT_EQ (received[2], 3);
+ EXPECT_EQ (received[3], 4);
+
+ status = ml_tensors_data_get_tensor_data (data, 1, (void **) &received, &data_size);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ EXPECT_EQ (data_size, 2 * sizeof (gint));
+ EXPECT_EQ (received[0], 5);
+ EXPECT_EQ (received[1], 6);
+
+ status = ml_tensors_data_get_tensor_data (data, 2, (void **) &received, &data_size);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ EXPECT_EQ (data_size, 4 * sizeof (gint));
+ EXPECT_EQ (received[0], 7);
+ EXPECT_EQ (received[1], 8);
+ EXPECT_EQ (received[2], 9);
+ EXPECT_EQ (received[3], 10);
+
+ G_UNLOCK (callback_lock);
+}
+
+/**
+ * @brief Test NNStreamer pipeline for flexible tensors.
+ */
+TEST (nnstreamer_capi_flex, sink_multi)
+{
+ gchar pipeline[] = "appsrc name=srcx caps=application/octet-stream,framerate=(fraction)10/1 ! "
+ "tensor_converter input-dim=4,2,4 input-type=int32,int32,int32 ! "
+ "other/tensors-flexible ! tensor_sink name=sinkx sync=false";
+ guint test_data[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ ml_pipeline_h handle;
+ ml_pipeline_src_h srchandle;
+ ml_pipeline_sink_h sinkhandle;
+ ml_tensors_info_h in_info;
+ ml_tensors_data_h in_data;
+ ml_tensor_dimension dim = { 10, 1, 1, 1 };
+ gint i, status;
+ guint *count_sink;
+
+ count_sink = (guint *) g_malloc0 (sizeof (guint));
+ ASSERT_TRUE (count_sink != NULL);
+
+ /* prepare input data */
+ ml_tensors_info_create (&in_info);
+ ml_tensors_info_set_count (in_info, 1);
+ ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_INT32);
+ ml_tensors_info_set_tensor_dimension (in_info, 0, dim);
+
+ ml_tensors_data_create (in_info, &in_data);
+ ml_tensors_data_set_tensor_data (in_data, 0, test_data, 10 * sizeof (gint));
+
+ /* start pipeline */
+ status = ml_pipeline_construct (pipeline, NULL, NULL, &handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_src_get_handle (handle, "srcx", &srchandle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_sink_register (handle, "sinkx",
+ test_sink_callback_flex, count_sink, &sinkhandle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_start (handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ /* push input data */
+ *count_sink = 0;
+ for (i = 0; i < 3; i++) {
+ g_usleep (50000);
+ status = ml_pipeline_src_input_data (srchandle, in_data,
+ ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ }
+
+ wait_pipeline_process_buffers (*count_sink, 3);
+ g_usleep (300000);
+ EXPECT_EQ (*count_sink, 3U);
+
+ status = ml_pipeline_destroy (handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ ml_tensors_info_destroy (in_info);
+ ml_tensors_data_destroy (in_data);
+ g_free (count_sink);
+}
+
+/**
+ * @brief Test NNStreamer pipeline for flexible tensors.
+ */
+TEST (nnstreamer_capi_flex, src_multi)
+{
+ gchar pipeline[] = "appsrc name=srcx caps=other/tensors-flexible,framerate=(fraction)10/1 ! "
+ "tensor_converter input-dim=4,2,4 input-type=int32,int32,int32 ! "
+ "tensor_sink name=sinkx sync=false";
+ guint test_data[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ ml_pipeline_h handle;
+ ml_pipeline_src_h srchandle;
+ ml_pipeline_sink_h sinkhandle;
+ ml_tensors_info_h in_info;
+ ml_tensors_data_h in_data;
+ ml_tensor_dimension dim1 = { 4, 1, 1, 1 };
+ ml_tensor_dimension dim2 = { 2, 1, 1, 1 };
+ ml_tensor_dimension dim3 = { 4, 1, 1, 1 };
+ gint i, status;
+ guint *count_sink;
+
+ count_sink = (guint *) g_malloc0 (sizeof (guint));
+ ASSERT_TRUE (count_sink != NULL);
+
+ /* prepare input data */
+ ml_tensors_info_create (&in_info);
+ ml_tensors_info_set_count (in_info, 3);
+ ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_INT32);
+ ml_tensors_info_set_tensor_dimension (in_info, 0, dim1);
+ ml_tensors_info_set_tensor_type (in_info, 1, ML_TENSOR_TYPE_INT32);
+ ml_tensors_info_set_tensor_dimension (in_info, 1, dim2);
+ ml_tensors_info_set_tensor_type (in_info, 2, ML_TENSOR_TYPE_INT32);
+ ml_tensors_info_set_tensor_dimension (in_info, 2, dim3);
+
+ ml_tensors_data_create (in_info, &in_data);
+ ml_tensors_data_set_tensor_data (in_data, 0, &test_data[0], 4 * sizeof (gint));
+ ml_tensors_data_set_tensor_data (in_data, 1, &test_data[4], 2 * sizeof (gint));
+ ml_tensors_data_set_tensor_data (in_data, 2, &test_data[6], 4 * sizeof (gint));
+
+ /* start pipeline */
+ status = ml_pipeline_construct (pipeline, NULL, NULL, &handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_src_get_handle (handle, "srcx", &srchandle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_sink_register (handle, "sinkx",
+ test_sink_callback_flex, count_sink, &sinkhandle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ status = ml_pipeline_start (handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ /* push input data */
+ *count_sink = 0;
+ for (i = 0; i < 3; i++) {
+ g_usleep (50000);
+ status = ml_pipeline_src_input_data (srchandle, in_data,
+ ML_PIPELINE_BUF_POLICY_DO_NOT_FREE);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+ }
+
+ wait_pipeline_process_buffers (*count_sink, 3);
+ g_usleep (300000);
+ EXPECT_EQ (*count_sink, 3U);
+
+ status = ml_pipeline_destroy (handle);
+ EXPECT_EQ (status, ML_ERROR_NONE);
+
+ ml_tensors_info_destroy (in_info);
+ ml_tensors_data_destroy (in_data);
+ g_free (count_sink);
+}
+
+/**
* @brief Main gtest
*/
int