[Android/Api] add custom-filter callback
authorJaeyun <jy1210.jung@samsung.com>
Mon, 29 Jul 2019 04:00:01 +0000 (13:00 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 1 Aug 2019 05:40:25 +0000 (14:40 +0900)
1. Add custom-filter in java to execute tensors data in the application.
2. Update sample code to set the custom-filter.

Signed-off-by: Jaeyun Jung <jy1210.jung@samsung.com>
api/android/api/jni/Android.mk
api/android/api/jni/nnstreamer-native-api.c
api/android/api/jni/nnstreamer-native-customfilter.c [new file with mode: 0644]
api/android/api/jni/nnstreamer-native.h
api/android/api/src/com/samsung/android/nnstreamer/CustomFilter.java [new file with mode: 0644]
api/android/sample/src/main/java/com/samsung/android/nnstreamer/sample/MainActivity.java

index cc9dd89..e11e731 100644 (file)
@@ -34,10 +34,11 @@ include $(CLEAR_VARS)
 
 LOCAL_MODULE := nnstreamer-native
 LOCAL_SRC_FILES := nnstreamer-native-api.c \
+    nnstreamer-native-customfilter.c \
     nnstreamer-native-pipeline.c \
     nnstreamer-native-singleshot.c
 LOCAL_CFLAGS += -O2 -DVERSION=\"$(NNSTREAMER_VERSION)\"
-LOCAL_C_INCLUDES := $(NNSTREAMER_CAPI_INCLUDES)
+LOCAL_C_INCLUDES := $(NNSTREAMER_INCLUDES) $(NNSTREAMER_CAPI_INCLUDES)
 LOCAL_STATIC_LIBRARIES := nnstreamer tensorflow-lite cpufeatures
 LOCAL_SHARED_LIBRARIES := gstreamer_android
 LOCAL_LDLIBS := -llog -landroid
index 2cbe4d7..882e1f1 100644 (file)
@@ -147,6 +147,11 @@ nns_destroy_pipe_info (pipeline_info_s * pipe_info, JNIEnv * env)
     ml_pipeline_destroy (pipe_info->pipeline_handle);
   } else if (g_str_equal (pipe_info->pipeline_type, NNS_PIPE_TYPE_SINGLE)) {
     ml_single_close (pipe_info->pipeline_handle);
+  } else if (g_str_equal (pipe_info->pipeline_type, NNS_PIPE_TYPE_CUSTOM)) {
+    /**
+     * Do nothing here (no handle to close).
+     * The handle is filter-framework and it will be closed in customfilter-destroy function.
+     */
   } else {
     nns_logw ("Given pipe type %s is unknown.", pipe_info->pipeline_type);
     if (pipe_info->pipeline_handle)
diff --git a/api/android/api/jni/nnstreamer-native-customfilter.c b/api/android/api/jni/nnstreamer-native-customfilter.c
new file mode 100644 (file)
index 0000000..eaafd94
--- /dev/null
@@ -0,0 +1,300 @@
+/**
+ * NNStreamer Android API
+ * Copyright (C) 2019 Samsung Electronics Co., Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ */
+
+/**
+ * @file       nnstreamer-native-customfilter.c
+ * @date       10 July 2019
+ * @brief      Native code for NNStreamer API
+ * @author     Jaeyun Jung <jy1210.jung@samsung.com>
+ * @bug                No known bugs except for NYI items
+ */
+
+#include "nnstreamer-native.h"
+
+/**
+ * @brief Table to handle custom-filter.
+ */
+static GHashTable *g_customfilters = NULL;
+
+/**
+ * @brief The mandatory callback for GstTensorFilterFramework.
+ * @param prop The property of tensor_filter instance
+ * @param private_data Sub-plugin's private data
+ * @param[in] input The array of input tensors
+ * @param[out] output The array of output tensors
+ * @return 0 if OK. Non-zero if error.
+ */
+static int
+nns_customfilter_invoke (const GstTensorFilterProperties * prop, void **private_data,
+    const GstTensorMemory * input, GstTensorMemory * output)
+{
+  pipeline_info_s *pipe_info = NULL;
+  ml_tensors_data_s *in_data, *out_data;
+  ml_tensors_info_s *in_info, *out_info;
+  JNIEnv *env;
+  guint i;
+  int ret = -1;
+
+  /* get pipe info and init */
+  pipe_info = g_hash_table_lookup (g_customfilters, prop->fwname);
+  g_return_val_if_fail (pipe_info, -1);
+
+  env = nns_get_jni_env (pipe_info);
+  g_return_val_if_fail (env, -1);
+
+  in_data = g_new0 (ml_tensors_data_s, 1);
+  g_assert (in_data);
+
+  out_data = g_new0 (ml_tensors_data_s, 1);
+  g_assert (out_data);
+
+  in_info = g_new0 (ml_tensors_info_s, 1);
+  g_assert (in_info);
+
+  out_info = g_new0 (ml_tensors_info_s, 1);
+  g_assert (out_info);
+
+  /* convert to c-api data type */
+  in_data->num_tensors = prop->input_meta.num_tensors;
+  for (i = 0; i < in_data->num_tensors; i++) {
+    in_data->tensors[i].tensor = input[i].data;
+    in_data->tensors[i].size = input[i].size;
+  }
+
+  ml_tensors_info_copy_from_gst (in_info, &prop->input_meta);
+  ml_tensors_info_copy_from_gst (out_info, &prop->output_meta);
+
+  /* call invoke callback */
+  jobject obj_in_data, obj_out_data;
+  jobject obj_in_info, obj_out_info;
+
+  obj_in_data = obj_out_data = NULL;
+  obj_in_info = obj_out_info = NULL;
+
+  if (!nns_convert_tensors_info (pipe_info, env, in_info, &obj_in_info)) {
+    nns_loge ("Failed to convert input info to info-object.");
+    goto done;
+  }
+
+  if (!nns_convert_tensors_info (pipe_info, env, out_info, &obj_out_info)) {
+    nns_loge ("Failed to convert output info to info-object.");
+    goto done;
+  }
+
+  if (!nns_convert_tensors_data (pipe_info, env, in_data, &obj_in_data)) {
+    nns_loge ("Failed to convert input data to data-object.");
+    goto done;
+  }
+
+  jclass cls_custom = (*env)->GetObjectClass (env, pipe_info->instance);
+  jmethodID mid_invoke = (*env)->GetMethodID (env, cls_custom, "invoke",
+      "(Lcom/samsung/android/nnstreamer/TensorsData;"
+      "Lcom/samsung/android/nnstreamer/TensorsInfo;"
+      "Lcom/samsung/android/nnstreamer/TensorsInfo;)"
+      "Lcom/samsung/android/nnstreamer/TensorsData;");
+
+  obj_out_data = (*env)->CallObjectMethod (env, pipe_info->instance, mid_invoke,
+      obj_in_data, obj_in_info, obj_out_info);
+  if (!nns_parse_tensors_data (pipe_info, env, obj_out_data, out_data)) {
+    nns_loge ("Failed to parse output data.");
+    goto done;
+  }
+
+  /* set output data */
+  for (i = 0; i < out_data->num_tensors; i++) {
+    output[i].data = out_data->tensors[i].tensor;
+
+    if (out_data->tensors[i].size != output[i].size) {
+      nns_logw ("The result has different buffer size at index %d [%zd:%zd]",
+          i, output[i].size, out_data->tensors[i].size);
+      output[i].size = out_data->tensors[i].size;
+    }
+  }
+
+  /* callback finished */
+  ret = 0;
+
+done:
+  if (obj_in_data)
+    (*env)->DeleteLocalRef (env, obj_in_data);
+  if (obj_out_data)
+    (*env)->DeleteLocalRef (env, obj_out_data);
+  if (obj_in_info)
+    (*env)->DeleteLocalRef (env, obj_in_info);
+  if (obj_out_info)
+    (*env)->DeleteLocalRef (env, obj_out_info);
+  (*env)->DeleteLocalRef (env, cls_custom);
+
+  g_free (in_data);
+  g_free (out_data);
+  ml_tensors_info_destroy ((ml_tensors_info_h) in_info);
+  ml_tensors_info_destroy ((ml_tensors_info_h) out_info);
+  return ret;
+}
+
+/**
+ * @brief The optional callback for GstTensorFilterFramework.
+ * @param prop The property of tensor_filter instance
+ * @param private_data Sub-plugin's private data
+ * @param[in] in_info The dimension and type of input tensors
+ * @param[out] out_info The dimension and type of output tensors
+ * @return 0 if OK. Non-zero if error.
+ */
+static int
+nns_customfilter_set_dimension (const GstTensorFilterProperties * prop, void **private_data,
+    const GstTensorsInfo * in_info, GstTensorsInfo * out_info)
+{
+  pipeline_info_s *pipe_info = NULL;
+  ml_tensors_info_s *in, *out;
+  JNIEnv *env;
+  int ret = -1;
+
+  /* get pipe info and init */
+  pipe_info = g_hash_table_lookup (g_customfilters, prop->fwname);
+  g_return_val_if_fail (pipe_info, -1);
+
+  env = nns_get_jni_env (pipe_info);
+  g_return_val_if_fail (env, -1);
+
+  in = g_new0 (ml_tensors_info_s, 1);
+  g_assert (in);
+
+  out = g_new0 (ml_tensors_info_s, 1);
+  g_assert (out);
+
+  /* convert to c-api data type */
+  ml_tensors_info_copy_from_gst (in, in_info);
+
+  /* call output info callback */
+  jobject obj_in_info, obj_out_info;
+
+  obj_in_info = obj_out_info = NULL;
+  if (!nns_convert_tensors_info (pipe_info, env, in, &obj_in_info)) {
+    nns_loge ("Failed to convert input tensors info to data object.");
+    goto done;
+  }
+
+  jclass cls_custom = (*env)->GetObjectClass (env, pipe_info->instance);
+  jmethodID mid_info = (*env)->GetMethodID (env, cls_custom, "getOutputInfo",
+      "(Lcom/samsung/android/nnstreamer/TensorsInfo;)Lcom/samsung/android/nnstreamer/TensorsInfo;");
+
+  obj_out_info = (*env)->CallObjectMethod (env, pipe_info->instance, mid_info, obj_in_info);
+  if (!obj_out_info || !nns_parse_tensors_info (pipe_info, env, obj_out_info, out)) {
+    nns_loge ("Failed to parse output info.");
+    goto done;
+  }
+
+  /* set output data */
+  ml_tensors_info_copy_from_ml (out_info, out);
+
+  /* callback finished */
+  ret = 0;
+
+done:
+  if (obj_in_info)
+    (*env)->DeleteLocalRef (env, obj_in_info);
+  if (obj_out_info)
+    (*env)->DeleteLocalRef (env, obj_out_info);
+  (*env)->DeleteLocalRef (env, cls_custom);
+
+  ml_tensors_info_destroy ((ml_tensors_info_h) in);
+  ml_tensors_info_destroy ((ml_tensors_info_h) out);
+  return ret;
+}
+
+/**
+ * @brief Native method for custom filter.
+ */
+jlong
+Java_com_samsung_android_nnstreamer_CustomFilter_nativeInitialize (JNIEnv * env, jobject thiz,
+    jstring name)
+{
+  pipeline_info_s *pipe_info = NULL;
+  GstTensorFilterFramework *fw = NULL;
+  const char *filter_name = (*env)->GetStringUTFChars (env, name, NULL);
+
+  nns_logd ("Try to add custom-filter %s.", filter_name);
+
+  if (nnstreamer_filter_find (filter_name)) {
+    nns_logw ("Custom-filter %s already exists.", filter_name);
+    goto done;
+  }
+
+  /* prepare filter-framework */
+  fw = g_new0 (GstTensorFilterFramework, 1);
+  if (!fw) {
+    nns_loge ("Failed to allocate memory for filter framework.");
+    goto done;
+  }
+
+  fw->name = g_strdup (filter_name);
+  fw->allocate_in_invoke = TRUE;
+  fw->run_without_model = TRUE;
+  fw->invoke_NN = nns_customfilter_invoke;
+  fw->setInputDimension = nns_customfilter_set_dimension;
+
+  if (!nnstreamer_filter_probe (fw)) {
+    nns_loge ("Failed to register custom-filter %s.", filter_name);
+    g_free (fw->name);
+    g_free (fw);
+    goto done;
+  }
+
+  pipe_info = nns_construct_pipe_info (env, thiz, fw, NNS_PIPE_TYPE_CUSTOM);
+
+  /* add custom-filter handle to the table */
+  g_mutex_lock (&pipe_info->lock);
+
+  if (g_customfilters == NULL) {
+    g_customfilters = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
+  }
+
+  g_assert (g_hash_table_insert (g_customfilters, g_strdup (filter_name), pipe_info));
+
+  g_mutex_unlock (&pipe_info->lock);
+
+done:
+  (*env)->ReleaseStringUTFChars (env, name, filter_name);
+  return CAST_TO_LONG (pipe_info);
+}
+
+/**
+ * @brief Native method for custom filter.
+ */
+void
+Java_com_samsung_android_nnstreamer_CustomFilter_nativeDestroy (JNIEnv * env, jobject thiz,
+    jlong handle)
+{
+  pipeline_info_s *pipe_info = NULL;
+  GstTensorFilterFramework *fw = NULL;
+
+  pipe_info = CAST_TO_TYPE (handle, pipeline_info_s*);
+  g_return_if_fail (pipe_info);
+
+  fw = (GstTensorFilterFramework *) pipe_info->pipeline_handle;
+  nns_logd ("Start to unregister custom-filter %s.", fw->name);
+
+  g_mutex_lock (&pipe_info->lock);
+  if (!g_hash_table_remove (g_customfilters, fw->name)) {
+    nns_logw ("Failed to remove custom-filter %s.", fw->name);
+  }
+  g_mutex_unlock (&pipe_info->lock);
+
+  nnstreamer_filter_exit (fw->name);
+  g_free (fw->name);
+  g_free (fw);
+
+  nns_destroy_pipe_info (pipe_info, env);
+}
index fa431c7..af34c4c 100644 (file)
@@ -32,6 +32,7 @@
 #include "nnstreamer.h"
 #include "nnstreamer-single.h"
 #include "nnstreamer-capi-private.h"
+#include "nnstreamer_plugin_api_filter.h"
 
 #ifndef DBG
 #define DBG FALSE
@@ -67,6 +68,7 @@
 
 #define NNS_PIPE_TYPE_PIPELINE "pipeline"
 #define NNS_PIPE_TYPE_SINGLE "single"
+#define NNS_PIPE_TYPE_CUSTOM "custom-filter"
 
 #define NNS_ELEMENT_TYPE_SRC "src"
 #define NNS_ELEMENT_TYPE_SINK "sink"
diff --git a/api/android/api/src/com/samsung/android/nnstreamer/CustomFilter.java b/api/android/api/src/com/samsung/android/nnstreamer/CustomFilter.java
new file mode 100644 (file)
index 0000000..d94fa54
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * NNStreamer Android API
+ * Copyright (C) 2019 Samsung Electronics Co., Ltd.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ */
+
+package com.samsung.android.nnstreamer;
+
+import android.support.annotation.NonNull;
+
+/**
+ * Provides interfaces to create a custom-filter in the pipeline.<br>
+ * <br>
+ * To register a new custom-filter, an application should call {@link #registerCustomFilter(String, CustomFilterCallback)}
+ * before constructing the pipeline.
+ */
+public class CustomFilter implements AutoCloseable {
+    private long mHandle = 0;
+    private String mName = null;
+    private CustomFilterCallback mCallback = null;
+
+    private native long nativeInitialize(String name);
+    private native void nativeDestroy(long handle);
+
+    /**
+     * Interface definition for a callback to be invoked while processing the pipeline.
+     */
+    public interface CustomFilterCallback {
+        /**
+         * Called synchronously when constructing a pipeline.
+         *
+         * NNStreamer filter configures input and output tensors information during the caps negotiation.
+         *
+         * Note that this is not a fixed value and the pipeline may try different values during the cap negotiations.
+         * An application should validate the information of input tensors and return proper output information.
+         *
+         * @param inInfo The input tensors information
+         *
+         * @return The output tensors information
+         */
+        TensorsInfo getOutputInfo(TensorsInfo inInfo);
+
+        /**
+         * Called synchronously while processing the pipeline.
+         *
+         * NNStreamer filter invokes the given custom-filter callback while processing the pipeline.
+         *
+         * @param inData  The input data (a single frame, tensor/tensors)
+         * @param inInfo  The input tensors information
+         * @param outInfo The output tensors information
+         *
+         * @return The output data (a single frame, tensor/tensors)
+         */
+        TensorsData invoke(TensorsData inData, TensorsInfo inInfo, TensorsInfo outInfo);
+    }
+
+    /**
+     * Registers new custom-filter with name.
+     *
+     * Note that if given name is duplicated in the pipeline, the registration will be failed and throw an exception.
+     *
+     * @param name     The name of custom-filter
+     * @param callback The function to be called while processing the pipeline
+     *
+     * @return <code>CustomFilter</code> instance
+     *
+     * @throws IllegalArgumentException if given param is null
+     * @throws IllegalStateException if failed to initialize custom-filter
+     */
+    public static CustomFilter registerCustomFilter(@NonNull String name, @NonNull CustomFilterCallback callback) {
+        return new CustomFilter(name, callback);
+    }
+
+    /**
+     * Gets the name of custom-filter.
+     *
+     * @return The name of custom-filter
+     */
+    public String getName() {
+        return mName;
+    }
+
+    /**
+     * Internal constructor to create and register a custom-filter.
+     *
+     * @param name     The name of custom-filter
+     * @param callback The function to be called while processing the pipeline
+     *
+     * @throws IllegalArgumentException if given param is null
+     * @throws IllegalStateException if failed to initialize custom-filter
+     */
+    private CustomFilter(@NonNull String name, @NonNull CustomFilterCallback callback) {
+        if (name == null) {
+            throw new IllegalArgumentException("The param name is null");
+        }
+
+        if (callback == null) {
+            throw new IllegalArgumentException("The param callback is null");
+        }
+
+        mName = name;
+        mCallback = callback;
+
+        mHandle = nativeInitialize(name);
+        if (mHandle == 0) {
+            throw new IllegalStateException("Failed to initialize custom-filter " + name);
+        }
+    }
+
+    /**
+     * Internal method called from native during the caps negotiation.
+     */
+    private TensorsInfo getOutputInfo(TensorsInfo info) {
+        TensorsInfo out = null;
+
+        if (mCallback != null) {
+            out = mCallback.getOutputInfo(info);
+        }
+
+        return out;
+    }
+
+    /**
+     * Internal method called from native while processing the pipeline.
+     */
+    private TensorsData invoke(TensorsData inData, TensorsInfo inInfo, TensorsInfo outInfo) {
+        TensorsData out = null;
+
+        if (mCallback != null) {
+            out = mCallback.invoke(inData, inInfo, outInfo);
+        }
+
+        return out;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        try {
+            close();
+        } finally {
+            super.finalize();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (mHandle != 0) {
+            nativeDestroy(mHandle);
+            mHandle = 0;
+        }
+    }
+}
index 21f681e..d75e200 100644 (file)
@@ -10,6 +10,7 @@ import android.support.v4.app.ActivityCompat;
 import android.support.v4.content.ContextCompat;
 import android.util.Log;
 
+import com.samsung.android.nnstreamer.CustomFilter;
 import com.samsung.android.nnstreamer.NNStreamer;
 import com.samsung.android.nnstreamer.Pipeline;
 import com.samsung.android.nnstreamer.SingleShot;
@@ -146,7 +147,7 @@ public class MainActivity extends Activity {
                     return;
                 }
 
-                int option = (exampleRun % 5);
+                int option = (exampleRun % 6);
 
                 if (option == 1) {
                     Log.d(TAG, "==== Run pipeline example with state callback ====");
@@ -160,6 +161,9 @@ public class MainActivity extends Activity {
                 } else if (option == 4) {
                     Log.d(TAG, "==== Run pipeline example with switch ====");
                     runPipeSwitch();
+                } else if (option == 5) {
+                    Log.d(TAG, "==== Run pipeline example with custom filter ====");
+                    runPipeCustomFilter();
                 } else {
                     Log.d(TAG, "==== Run single-shot example ====");
                     runSingle();
@@ -500,4 +504,146 @@ public class MainActivity extends Activity {
             isFailed = true;
         }
     }
+
+    /**
+     * Example to run pipeline with custom filter.
+     */
+    private void runPipeCustomFilter() {
+        try {
+            /* register custom-filter (passthrough) */
+            CustomFilter customPassthrough = CustomFilter.registerCustomFilter("custom-passthrough",
+                    new CustomFilter.CustomFilterCallback() {
+                @Override
+                public TensorsInfo getOutputInfo(TensorsInfo inInfo) {
+                    Log.d(TAG, "Received info callback in custom-passthrough");
+                    return inInfo;
+                }
+
+                @Override
+                public TensorsData invoke(TensorsData inData, TensorsInfo inInfo, TensorsInfo outInfo) {
+                    Log.d(TAG, "Received invoke callback in custom-passthrough");
+                    return inData;
+                }
+            });
+
+            /* register custom-filter (convert data type to float) */
+            CustomFilter customConvert = CustomFilter.registerCustomFilter("custom-convert",
+                    new CustomFilter.CustomFilterCallback() {
+                @Override
+                public TensorsInfo getOutputInfo(TensorsInfo inInfo) {
+                    Log.d(TAG, "Received info callback in custom-convert");
+
+                    TensorsInfo out = inInfo;
+                    out.setTensorType(0, NNStreamer.TENSOR_TYPE_FLOAT32);
+
+                    return out;
+                }
+
+                @Override
+                public TensorsData invoke(TensorsData inData, TensorsInfo inInfo, TensorsInfo outInfo) {
+                    Log.d(TAG, "Received invoke callback in custom-convert");
+
+                    ByteBuffer input = inData.getTensorData(0);
+                    ByteBuffer output = TensorsData.allocateByteBuffer(4 * 10);
+
+                    for (int i = 0; i < 10; i++) {
+                        float value = (float) input.getInt(i * 4);
+                        output.putFloat(i * 4, value);
+                    }
+
+                    TensorsData out = new TensorsData();
+                    out.addTensorData(output);
+
+                    return out;
+                }
+            });
+
+            /* register custom-filter (add constant) */
+            CustomFilter customAdd = CustomFilter.registerCustomFilter("custom-add",
+                    new CustomFilter.CustomFilterCallback() {
+                @Override
+                public TensorsInfo getOutputInfo(TensorsInfo inInfo) {
+                    Log.d(TAG, "Received info callback in custom-add");
+                    return inInfo;
+                }
+
+                @Override
+                public TensorsData invoke(TensorsData inData, TensorsInfo inInfo, TensorsInfo outInfo) {
+                    Log.d(TAG, "Received invoke callback in custom-add");
+
+                    ByteBuffer input = inData.getTensorData(0);
+                    ByteBuffer output = TensorsData.allocateByteBuffer(4 * 10);
+
+                    for (int i = 0; i < 10; i++) {
+                        float value = input.getFloat(i * 4);
+
+                        /* add constant */
+                        value += 1.5;
+                        output.putFloat(i * 4, value);
+                    }
+
+                    TensorsData out = new TensorsData();
+                    out.addTensorData(output);
+
+                    return out;
+                }
+            });
+
+            String desc = "appsrc name=srcx ! other/tensor,dimension=(string)10:1:1:1,type=(string)int32,framerate=(fraction)0/1 ! " +
+                    "tensor_filter framework=" + customPassthrough.getName() + " ! " +
+                    "tensor_filter framework=" + customConvert.getName() + " ! " +
+                    "tensor_filter framework=" + customAdd.getName() + " ! " +
+                    "tensor_sink name=sinkx";
+
+            Pipeline pipe = new Pipeline(desc);
+
+            /* register sink callback */
+            pipe.setSinkCallback("sinkx", new Pipeline.NewDataCallback() {
+                int received = 0;
+
+                @Override
+                public void onNewDataReceived(TensorsData data, TensorsInfo info) {
+                    Log.d(TAG, "Received new data callback at sinkx " + (++received));
+
+                    printTensorsInfo(info);
+                    printTensorsData(data);
+
+                    ByteBuffer output = data.getTensorData(0);
+
+                    for (int i = 0; i < 10; i++) {
+                        Log.d(TAG, "Received data: index " + i + " value " + output.getFloat(i * 4));
+                    }
+                }
+            });
+
+            /* start pipeline */
+            pipe.start();
+
+            /* push input buffer */
+            for (int i = 0; i < 15; i++) {
+                ByteBuffer input = TensorsData.allocateByteBuffer(4 * 10);
+
+                for (int j = 0; j < 10; j++) {
+                    input.putInt(j * 4, j);
+                }
+
+                TensorsData in = new TensorsData();
+                in.addTensorData(input);
+
+                pipe.inputData("srcx", in);
+                Thread.sleep(50);
+            }
+
+            pipe.close();
+
+            /* close custom-filter */
+            customPassthrough.close();
+            customConvert.close();
+            customAdd.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+            Log.e(TAG, e.getMessage());
+            isFailed = true;
+        }
+    }
 }