[ML][Pipeline] Implement CustomFilter callback 68/253768/15
authorPawel Wasowski <p.wasowski2@samsung.com>
Mon, 22 Feb 2021 10:07:11 +0000 (11:07 +0100)
committerPawel Wasowski <p.wasowski2@samsung.com>
Wed, 24 Feb 2021 16:26:40 +0000 (16:26 +0000)
ACR: TWDAPI-274

This is the second part of CustomFilter implementation. It adds
transfering the data between JS and C++.

[Verification] Code tested with the snippets below works fine

// Valid CustomFilter callback - the happy scenario
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");
    var outputTD = outputTI.getTensorsData();
    var rawInputData = input.getTensorRawData(0);
    for (var i = 0; i < rawInputData.data.size; ++i) {
        rawInputData.data[i] = rawInputData.data[i] + 1;
    }
        outputTD.setTensorRawData(0, rawInputData.data);
        return new tizen.ml.CustomFilterOutput(0, outputTD);
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING
// <CustomFilter and SinkListener callbacks' outputs 3 times>

////////////////////////////////////////////////////////////

// Valid CustomFilter callback - the happy scenario; ignore the data

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");
        return new tizen.ml.CustomFilterOutput(1, null); // ignore data
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// Custom filter called
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING

////////////////////////////////////////////////////////////

// Valid CustomFilter callback - CustomFilter returns an error

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");
        return new tizen.ml.CustomFilterOutput(-1, null);
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING

////////////////////////////////////////////////////////////

// Invalid CustomFilter callback output - status == 0, but no
TensorsData

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");

    return new tizen.ml.CustomFilterOutput(0, null);
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// custom filter error:
// {name: "AbortError", message: "CustomFilter has thrown exception:
// {'code':0, 'name':'InvalidValuesError' ..."}

////////////////////////////////////////////////////////////

// Invalid CustomFilter callback output - non-CustomFilterOutput

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");

    return 123;
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// custom filter error:
// {name: "TypeMismatchError",
// message: "The value returned from CustomFilter is not a
// CustomFilterOutput object"}

////////////////////////////////////////////////////////////
// CustomFilter callback returns TensorsData with dimensions other
// than specified in tizen.ml.pipeline.registerCustomFilter(...) call

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");

    var invalidOutputTI = new tizen.ml.TensorsInfo();
    invalidOutputTI.addTensorInfo('ti1', 'UINT8', [5, 2]);

    var outputTD = invalidOutputTI.getTensorsData();

    outputTD.setTensorRawData(0, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
    return new tizen.ml.CustomFilterOutput(0, outputTD);
}

// register - the happy scenario
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called

// custom filter error: {name: "InvalidValuesError",
// message: "Output's TensorsInfo is not equal to expected"}

pipeline.start()

////////////////////////////////////////////////////////////

// CustomFilter callback returns non-TensorsData as output

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");

    return new tizen.ml.CustomFilterOutput(0, "this should be TensorsData");
}

// register - the happy scenario
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// custom filter error:
// {name: "AbortError", message: "CustomFilter has thrown exception: {
..."}

////////////////////////////////////////////////////////////

// Invalid CustomFilterOutput.status

new tizen.ml.CustomFilterOutput(666, null);
// InvalidValuesError,
//  message: "CustomFilterOutput.status === 1 is the only legal positive value"

Change-Id: Icf2edee853eb97dde54ecd83f00164b302aa29ca
Signed-off-by: Pawel Wasowski <p.wasowski2@samsung.com>
src/ml/js/ml_common.js
src/ml/js/ml_manager.js
src/ml/js/ml_pipeline.js
src/ml/ml_instance.cc
src/ml/ml_instance.h
src/ml/ml_pipeline.cc
src/ml/ml_pipeline.h
src/ml/ml_pipeline_custom_filter.cc
src/ml/ml_pipeline_custom_filter.h
src/ml/ml_pipeline_manager.cc
src/ml/ml_pipeline_manager.h

index ef54ca2..21c5dcc 100755 (executable)
@@ -17,6 +17,7 @@
 var privUtils_ = xwalk.utils;
 var validator_ = privUtils_.validator;
 var types_ = validator_.Types;
+var type_ = xwalk.utils.type;
 var native_ = new xwalk.utils.NativeManager(extension);
 
 var AbortError = new WebAPIException('AbortError', 'An unknown error occurred');
index 69d96a3..66e3935 100755 (executable)
@@ -78,3 +78,4 @@ MachineLearningManager.prototype.checkNNFWAvailability = function() {
 
 exports = new MachineLearningManager();
 exports.TensorsInfo = TensorsInfo;
+exports.CustomFilterOutput = CustomFilterOutput;
index e1b211a..5ace36b 100755 (executable)
@@ -720,6 +720,54 @@ var MachineLearningPipeline = function() {};
 MachineLearningPipeline.prototype.createPipeline = CreatePipeline;
 
 //Pipeline::registerCustomFilter() begin
+var CustomFilterOutput = function() {
+    validator_.isConstructorCall(this, CustomFilterOutput);
+
+    var args = validator_.validateArgs(arguments, [
+        {
+            name: 'status',
+            type: validator_.Types.LONG
+        },
+        {
+            name: 'data',
+            type: types_.PLATFORM_OBJECT,
+            values: TensorsData,
+            optional: true,
+            nullable: true
+        }
+    ]);
+
+    if (!args.has.data) {
+        args.data = null;
+    }
+
+    if (args.status > 0 && args.status !== 1) {
+        throw new WebAPIException(
+            WebAPIException.INVALID_VALUES_ERR,
+            'CustomFilterOutput.status === 1 is the only legal positive value'
+        );
+    }
+
+    if (args.status === 0 && args.data === null) {
+        throw new WebAPIException(
+            WebAPIException.INVALID_VALUES_ERR,
+            'CustomFilterOutput.data === null is illegal when ' +
+                'CustomFilterOutput.status === 0'
+        );
+    }
+
+    Object.defineProperties(this, {
+        status: {
+            enumerable: true,
+            value: args.status
+        },
+        data: {
+            enumerable: true,
+            value: args.data
+        }
+    });
+};
+
 var ValidRegisterCustomFilterExceptions = [
     'InvalidValuesError',
     'NotSupportedError',
@@ -727,6 +775,7 @@ var ValidRegisterCustomFilterExceptions = [
     'AbortError'
 ];
 
+var ValidCustomFilterOutputErrors = ['InvalidValuesError', 'AbortError'];
 MachineLearningPipeline.prototype.registerCustomFilter = function() {
     var args = validator_.validateArgs(arguments, [
         {
@@ -762,8 +811,107 @@ MachineLearningPipeline.prototype.registerCustomFilter = function() {
         outputTensorsInfoId: args.outputInfo._id
     };
 
+    /*
+     * CustomFilter processing has 4 stages (the description below assumes
+     * the typical scenario with no errors):
+     * 1. (C++) C++ callback is called by the native API with input data.
+     * The C++ callback clones the tensors data and associated info and
+     * sends it to JS.
+     * 2. (JS) customFilterWrapper is called with the input data from C++
+     * as one of its arguments. User-provided callback processes the data
+     * and the output is sent to C++ by a call of asynchronous function.
+     * 3. (C++) C++ callback is woken up and clones the output from user
+     * callback to native tensors data. It replies to JS with success/error.
+     * 4. (JS) If C++ responded with success, the operation stops.
+     * Otherwise, the error callback provided by the user is called.
+     */
     var customFilterWrapper = function(msg) {
-        // TODO: In the next commit
+        /*
+         * Check if stage 1. was successful.
+         */
+        if (native_.isFailure(msg)) {
+            native_.callIfPossible(args.errorCallback, customFilterErrorInJs);
+            return;
+        }
+
+        var inputData = new TensorsData(msg.tensorsDataId, msg.tensorsInfoId);
+        /*
+         * customFilterErrorInJs records errors caused by the CustomFilter callback
+         * provided by the user.
+         */
+        var customFilterErrorInJs = null;
+        var jsResponse = {
+            status: -1,
+            dataId: -1,
+            name: nativeArgs.name,
+            requestId: msg.requestId
+        };
+        var output = null;
+
+        try {
+            output = args.customFilter(inputData);
+        } catch (exception) {
+            customFilterErrorInJs = new WebAPIException(
+                WebAPIException.ABORT_ERR,
+                'CustomFilter has thrown exception: ' + xwalk.JSON.stringify(exception)
+            );
+        }
+
+        if (output instanceof CustomFilterOutput) {
+            jsResponse.status = output.status;
+            jsResponse.dataId = type_.isNullOrUndefined(output.data)
+                ? -1
+                : output.data._id;
+        } else if (customFilterErrorInJs === null) {
+            customFilterErrorInJs = new WebAPIException(
+                WebAPIException.TYPE_MISMATCH_ERR,
+                'The value returned from CustomFilter is not a CustomFilterOutput object'
+            );
+        }
+
+        /*
+         * Callback called in stage 4.
+         *
+         * It is used to process success/error messages that come from
+         * C++ (stage 3).
+         * It does not handle errors caused by the user-provided CustomFilter
+         * which we detect in JS.
+         */
+        function filterOutputCallback(result) {
+            if (native_.isSuccess(result)) {
+                return;
+            }
+
+            var error = native_.getErrorObjectAndValidate(
+                result,
+                ValidCustomFilterOutputErrors,
+                AbortError
+            );
+
+            native_.callIfPossible(args.errorCallback, error);
+        }
+
+        /*
+         * Entering stage 3.
+         */
+        var result = native_.call(
+            'MLPipelineManagerCustomFilterOutput',
+            jsResponse,
+            filterOutputCallback
+        );
+
+        if (customFilterErrorInJs) {
+            /*
+             * If we detect that user-provided CustomFilter callback caused
+             * any errors in JS, the C++ layer gets the message to stop the
+             * pipeline (status == -1) and does not reply to JS with errors.
+             * Thus, filterOutputCallback is not called and this is why we
+             * call the user-provided error callback from JS.
+             */
+            native_.callIfPossible(args.errorCallback, customFilterErrorInJs);
+        } else if (native_.isFailure(result)) {
+            filterOutputCallback(result);
+        }
     };
 
     if (native_.listeners_.hasOwnProperty(nativeArgs.listenerName)) {
index 0cdf5f5..9eb99b6 100644 (file)
@@ -55,6 +55,10 @@ const std::string kShape = "shape";
 const std::string kListenerName = "listenerName";
 const std::string kInputTensorsInfoId = "inputTensorsInfoId";
 const std::string kOutputTensorsInfoId = "outputTensorsInfoId";
+const std::string kStatus = "status";
+const std::string kDataId = "dataId";
+const std::string kRequestId = "requestId";
+const int kCustomFilterSuccess = 0;
 }  //  namespace
 
 using namespace common;
@@ -148,6 +152,7 @@ MlInstance::MlInstance()
   REGISTER_METHOD(MLPipelineRegisterSinkListener);
   REGISTER_METHOD(MLPipelineUnregisterSinkListener);
   REGISTER_METHOD(MLPipelineManagerRegisterCustomFilter);
+  REGISTER_METHOD(MLPipelineManagerCustomFilterOutput);
   REGISTER_METHOD(MLPipelineManagerUnregisterCustomFilter);
 // Pipeline API end
 
@@ -1238,6 +1243,32 @@ void MlInstance::MLPipelineManagerRegisterCustomFilter(const picojson::value& ar
 }
 // Pipeline::registerCustomFilter() end
 
+void MlInstance::MLPipelineManagerCustomFilterOutput(const picojson::value& args,
+                                                     picojson::object& out) {
+  ScopeLogger("args: %s", args.serialize().c_str());
+
+  CHECK_ARGS(args, kName, std::string, out);
+  CHECK_ARGS(args, kStatus, double, out);
+  CHECK_ARGS(args, kRequestId, double, out);
+  CHECK_ARGS(args, kDataId, double, out);
+  CHECK_ARGS(args, kCallbackId, double, out);
+
+  const auto& custom_filter_name = args.get(kName).get<std::string>();
+  auto status = static_cast<int>(args.get(kStatus).get<double>());
+  auto request_id = static_cast<int>(args.get(kRequestId).get<double>());
+  auto data_id = static_cast<int>(args.get(kDataId).get<double>());
+  auto callback_id = static_cast<int>(args.get(kCallbackId).get<double>());
+
+  auto ret = pipeline_manager_.CustomFilterOutput(custom_filter_name, request_id, status, data_id,
+                                                  callback_id);
+  if (!ret) {
+    LogAndReportError(ret, &out);
+    return;
+  }
+
+  ReportSuccess(out);
+}
+
 // Pipeline::unregisterCustomFilter() begin
 void MlInstance::MLPipelineManagerUnregisterCustomFilter(const picojson::value& args,
                                                          picojson::object& out) {
index 35c8322..88b71af 100644 (file)
@@ -153,6 +153,7 @@ class MlInstance : public common::ParsedInstance {
 
   // Pipeline::registerCustomFilter() begin
   void MLPipelineManagerRegisterCustomFilter(const picojson::value& args, picojson::object& out);
+  void MLPipelineManagerCustomFilterOutput(const picojson::value& args, picojson::object& out);
   // Pipeline::registerCustomFilter() end
 
   // Pipeline::unregisterCustomFilter() begin
index f3d408f..66b7756 100644 (file)
@@ -342,14 +342,6 @@ PlatformResult Pipeline::UnregisterSinkListener(const std::string& sink_name) {
 }
 // Pipeline::unregisterSinkCallback() end
 
-// Pipeline::registerCustomFilter() begin
-
-// Pipeline::registerCustomFilter() end
-
-// Pipeline::unregisterCustomFilter() begin
-
-// Pipeline::unregisterCustomFilter() end
-
 // NodeInfo::getProperty() begin
 PlatformResult Pipeline::getProperty(const std::string& node_name, const std::string& name,
                                      const std::string& type, picojson::object* property) {
index c529e6b..cc370ce 100644 (file)
@@ -26,6 +26,7 @@
 #include "common/extension.h"
 #include "common/picojson.h"
 #include "common/platform_result.h"
+#include "ml_pipeline_custom_filter.h"
 #include "ml_pipeline_nodeinfo.h"
 #include "ml_pipeline_sink.h"
 #include "ml_pipeline_source.h"
@@ -100,14 +101,6 @@ class Pipeline {
   PlatformResult UnregisterSinkListener(const std::string& sink_name);
   // Pipeline::unregisterSinkCallback() end
 
-  // Pipeline::registerCustomFilter() begin
-
-  // Pipeline::registerCustomFilter() end
-
-  // Pipeline::unregisterCustomFilter() begin
-
-  // Pipeline::unregisterCustomFilter() end
-
   // NodeInfo::getProperty() begin
   PlatformResult getProperty(const std::string& node_name, const std::string& name,
                              const std::string& type, picojson::object* property);
index 43d5576..53e7891 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <nnstreamer/nnstreamer.h>
+#include <atomic>
 #include <utility>
 
 #include "common/tools.h"
 
 using common::ErrorCode;
 using common::PlatformResult;
+using common::tools::ReportError;
+using common::tools::ReportSuccess;
 
 namespace {
 
+const std::string kCallbackId = "callbackId";
 const std::string kListenerId = "listenerId";
+const std::string kRequestId = "requestId";
+const std::string kTensorsDataId = "tensorsDataId";
+const std::string kTensorsInfoId = "tensorsInfoId";
 
 }  //  namespace
 
@@ -34,6 +41,44 @@ namespace extension {
 namespace ml {
 namespace pipeline {
 
+const int CustomFilter::kCustomFilterError = -1;
+const int CustomFilter::kCustomFilterIgnoreData = 1;
+const int CustomFilter::kCustomFilterSuccess = 0;
+
+CustomFilter::JSResponse::JSResponse(int status, int callback_id, TensorsData* tensors_data_ptr,
+                                     TensorsDataManager* tensors_data_manager_ptr,
+                                     TensorsInfoManager* tensors_info_manager_ptr)
+    : status{status},
+      callback_id{callback_id},
+      tensors_data_ptr{tensors_data_ptr},
+      tensors_data_manager_ptr{tensors_data_manager_ptr},
+      tensors_info_manager_ptr{tensors_info_manager_ptr} {
+  ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: %p]", status, callback_id,
+              tensors_data_ptr);
+}
+
+CustomFilter::JSResponse::~JSResponse() {
+  ScopeLogger("status: [%d], callback_id: [%d], tensors_data_ptr: [%p]", status, callback_id,
+              tensors_data_ptr);
+
+  if (!tensors_data_ptr) {
+    return;
+  }
+  // We ignore errors, because we can't do anything about them and these methods
+  // will log error messages
+  tensors_info_manager_ptr->DisposeTensorsInfo(tensors_data_ptr->TensorsInfoId());
+  tensors_data_manager_ptr->DisposeTensorsData(tensors_data_ptr);
+}
+
+CustomFilter::JSResponse::JSResponse(JSResponse&& other)
+    : status{other.status},
+      callback_id{other.callback_id},
+      tensors_data_ptr{other.tensors_data_ptr},
+      tensors_data_manager_ptr{other.tensors_data_manager_ptr},
+      tensors_info_manager_ptr{other.tensors_info_manager_ptr} {
+  other.tensors_data_ptr = nullptr;
+}
+
 PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
     const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
     TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
@@ -141,12 +186,183 @@ PlatformResult CustomFilter::Unregister() {
   return PlatformResult{};
 }
 
+void CustomFilter::NotifyAboutJSResponse(int request_id, int status, int callback_id,
+                                         TensorsData* tensors_data_ptr) {
+  ScopeLogger("request_id: [%d], status: [%d], callback_id: [%d], tensors_data_ptr: [%p]",
+              request_id, status, callback_id, tensors_data_ptr);
+
+  std::lock_guard<std::mutex>{request_id_to_js_response_mutex_};
+  request_id_to_js_response_.emplace(
+      request_id, JSResponse{status, callback_id, tensors_data_ptr, tensors_data_manager_ptr_,
+                             tensors_info_manager_ptr_});
+  cv_.notify_all();
+}
+
+int CustomFilter::getRequestId() {
+  ScopeLogger();
+
+  static std::atomic_int data_id{0};
+  return data_id++;
+}
+
+bool CustomFilter::PrepareMessageWithInputData(const ml_tensors_data_h input_tensors_data,
+                                               picojson::value* out_message, int* out_request_id) {
+  ScopeLogger("input_tensors_data: [%p]", input_tensors_data);
+
+  auto& message_obj = out_message->get<picojson::object>();
+  message_obj[kListenerId] = picojson::value{listener_name_};
+
+  /*
+   * According to native API devs, the custom filter callback runs in the thread other than
+   * the main.
+   *
+   * As we put the thread, in which the callback is called, to sleep, we test if this assumption
+   * is correct - if not, going to sleep in the main thread would stop the app.
+   */
+  const auto listener_thread_id = std::this_thread::get_id();
+  if (main_thread_id_ == listener_thread_id) {
+    LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
+                      &message_obj, ("CustomFilterListener is called from the main thread"));
+    return false;
+  }
+
+  auto* input_tensors_data_clone_ptr = tensors_info_manager_ptr_->CloneNativeTensorWithData(
+      input_tensors_info_ptr_->Handle(), input_tensors_data);
+  if (!input_tensors_data_clone_ptr) {
+    LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
+                      &message_obj,
+                      ("Could not clone tensors data. Custom filter won't be triggered."));
+    return false;
+  }
+
+  message_obj[kTensorsDataId] =
+      picojson::value{static_cast<double>(input_tensors_data_clone_ptr->Id())};
+  message_obj[kTensorsInfoId] =
+      picojson::value{static_cast<double>(input_tensors_data_clone_ptr->TensorsInfoId())};
+  *out_request_id = getRequestId();
+  message_obj[kRequestId] = picojson::value{static_cast<double>(*out_request_id)};
+
+  return true;
+}
+
+int CustomFilter::CopyJsFilterOutputToNativeObject(int request_id, const JSResponse& js_response,
+                                                   ml_tensors_data_h output_tensors_data,
+                                                   picojson::value* out_response_to_js) {
+  ScopeLogger("request_id: [%d]", request_id);
+
+  auto& response_to_js_obj = out_response_to_js->get<picojson::object>();
+  response_to_js_obj[kCallbackId] = picojson::value{static_cast<double>(js_response.callback_id)};
+
+  int custom_filter_status = kCustomFilterError;
+  if (kCustomFilterIgnoreData == js_response.status || js_response.status < 0) {
+    /*
+     * Although js_response.status < 0 means "error", we respond with "success" message
+     * to JS, because this status came from JS and the problem, if any, is already handled there.
+     */
+    ReportSuccess(response_to_js_obj);
+    custom_filter_status = js_response.status;
+  } else if (kCustomFilterSuccess == js_response.status) {
+    auto* js_response_tensors_info_ptr =
+        tensors_info_manager_ptr_->GetTensorsInfo(js_response.tensors_data_ptr->TensorsInfoId());
+
+    if (!js_response_tensors_info_ptr) {
+      LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
+                        &response_to_js_obj,
+                        ("Could not get tensors info. Custom filter won't be triggered."));
+      return kCustomFilterError;
+    }
+
+    if (!output_tensors_info_ptr_->Equals(js_response_tensors_info_ptr)) {
+      LogAndReportError(PlatformResult(ErrorCode::INVALID_VALUES_ERR,
+                                       "Output's TensorsInfo is not equal to expected"),
+                        &response_to_js_obj);
+      return kCustomFilterError;
+    }
+
+    auto tensors_count = js_response_tensors_info_ptr->Count();
+    for (int i = 0; i < tensors_count; ++i) {
+      void* data = nullptr;
+      size_t data_size = 0;
+      auto ret = ml_tensors_data_get_tensor_data(js_response.tensors_data_ptr->Handle(), i, &data,
+                                                 &data_size);
+      if (ML_ERROR_NONE != ret) {
+        LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
+                          &response_to_js_obj,
+                          ("ml_tensors_data_get_tensor_data() failed: [%d] (%s), i: [%d]", ret,
+                           get_error_message(ret), i));
+        return kCustomFilterError;
+      }
+
+      ret = ml_tensors_data_set_tensor_data(output_tensors_data, i, data, data_size);
+      if (ML_ERROR_NONE != ret) {
+        LogAndReportError(PlatformResult(ErrorCode::ABORT_ERR, "Internal CustomFilter error"),
+                          &response_to_js_obj,
+                          ("ml_tensors_data_set_tensor_data() failed: [%d] (%s), i: [%d]", ret,
+                           get_error_message(ret), i));
+        return kCustomFilterError;
+      }
+    }
+
+    custom_filter_status = kCustomFilterSuccess;
+  } else {
+    ReportError(PlatformResult{ErrorCode::ABORT_ERR, "Internal Customfilter error"},
+                &response_to_js_obj);
+    custom_filter_status = kCustomFilterError;
+  }
+
+  return custom_filter_status;
+}
+
 int CustomFilter::CustomFilterListener(const ml_tensors_data_h input_tensors_data,
                                        ml_tensors_data_h output_tensors_data, void* user_data) {
   ScopeLogger("input_tensors_data: [%p], tensors_info_out: [%p], user_data: [%p]",
               input_tensors_data, output_tensors_data, user_data);
-  // TODO: in next commit
-  return -1;
+
+  if (!user_data) {
+    LoggerE("user_data is a nullptr");
+    return -1;
+  }
+
+  CustomFilter* custom_filter_ptr = static_cast<CustomFilter*>(user_data);
+
+  picojson::value message{picojson::object{}};
+  int request_id = -1;
+  auto success =
+      custom_filter_ptr->PrepareMessageWithInputData(input_tensors_data, &message, &request_id);
+
+  std::unique_lock<std::mutex> lock{custom_filter_ptr->request_id_to_js_response_mutex_};
+  common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
+
+  if (!success) {
+    LoggerE("An error occurred during input processing. Pipeline will stop");
+    return kCustomFilterError;
+  }
+
+  /*
+   * Stage 1. of data processing (see the comment above customFilterWrapper in
+   * ml_pipeline.js) ends here.
+   *
+   * We put this thread to sleep and wait for the response from JS
+   * (the main thread).
+   */
+  custom_filter_ptr->cv_.wait(lock, [custom_filter_ptr, request_id]() {
+    return custom_filter_ptr->request_id_to_js_response_.count(request_id) != 0;
+  });
+
+  /*
+   * Stage 3. of data processing starts here.
+   */
+  auto js_response{std::move(custom_filter_ptr->request_id_to_js_response_[request_id])};
+  custom_filter_ptr->request_id_to_js_response_.erase(request_id);
+  lock.unlock();
+
+  picojson::value response_to_js{picojson::object{}};
+  auto custom_filter_status = custom_filter_ptr->CopyJsFilterOutputToNativeObject(
+      request_id, js_response, output_tensors_data, &response_to_js);
+
+  common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, response_to_js);
+
+  return custom_filter_status;
 }
 
 }  // namespace pipeline
index 8f86ea4..ad953fd 100644 (file)
@@ -17,7 +17,9 @@
 #ifndef ML_ML_PIPELINE_CUSTOM_FILTER_H_
 #define ML_ML_PIPELINE_CUSTOM_FILTER_H_
 
+#include <condition_variable>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <thread>
 #include <unordered_map>
@@ -38,6 +40,11 @@ namespace pipeline {
 
 class CustomFilter {
  public:
+  /*
+   * WARNING: this function MUST be called from
+   * the main application thread - it retrieves thread id
+   * that is saved to main_thread_id_ of the new object.
+   */
   static PlatformResult CreateAndRegisterCustomFilter(
       const std::string& name, const std::string& listener_name,
       TensorsInfo* input_tensors_info_ptr, TensorsInfo* output_tensors_info_ptr,
@@ -51,6 +58,13 @@ class CustomFilter {
   CustomFilter(const CustomFilter&) = delete;
   CustomFilter& operator=(const CustomFilter&) = delete;
 
+  void NotifyAboutJSResponse(int request_id, int status, int callback_id,
+                             TensorsData* tensors_data_ptr);
+
+  static const int kCustomFilterError;
+  static const int kCustomFilterIgnoreData;
+  static const int kCustomFilterSuccess;
+
  private:
   CustomFilter(const std::string& name, const std::string& listener_name,
                TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
@@ -61,6 +75,45 @@ class CustomFilter {
   static int CustomFilterListener(const ml_tensors_data_h tensors_data_in,
                                   ml_tensors_data_h tensors_data_out, void* user_data);
 
+  struct JSResponse {
+    JSResponse() = default;
+    JSResponse(int status, int callback_id, TensorsData* tensors_data_ptr,
+               TensorsDataManager* tensors_data_manager_ptr,
+               TensorsInfoManager* tensors_info_manager_ptr);
+    JSResponse(JSResponse&& other);
+
+    ~JSResponse();
+
+    JSResponse(const JSResponse&) = delete;
+    JSResponse& operator=(const JSResponse&) = delete;
+
+    int status;
+    int callback_id;
+
+    TensorsData* tensors_data_ptr = nullptr;
+
+    // We need these managers to properly dispose
+    // tensors_data_ptr and the associated TensorsInfo object
+    TensorsDataManager* tensors_data_manager_ptr;
+    TensorsInfoManager* tensors_info_manager_ptr;
+  };
+
+  /*
+   * Returns "false" if any error occurs and "true" otherwise.
+   */
+  bool PrepareMessageWithInputData(const ml_tensors_data_h input_tensors_data,
+                                   picojson::value* out_message, int* out_request_id);
+
+  /*
+   * Returns the value to be returned from CustomFilter, which
+   * implements ml_custom_easy_invoke_cb;
+   */
+  int CopyJsFilterOutputToNativeObject(int request_id, const JSResponse& js_response,
+                                       ml_tensors_data_h output_tensors_data,
+                                       picojson::value* out_response_to_js);
+
+  int getRequestId();
+
   const std::string name_;
   const std::string listener_name_;
   TensorsInfo* input_tensors_info_ptr_;
@@ -70,6 +123,9 @@ class CustomFilter {
   TensorsInfoManager* tensors_info_manager_ptr_;
   TensorsDataManager* tensors_data_manager_ptr_;
 
+  std::mutex request_id_to_js_response_mutex_;
+  std::unordered_map<int, JSResponse> request_id_to_js_response_;
+  std::condition_variable cv_;
   std::thread::id main_thread_id_;
 };
 
index 7ed62b5..4278e19 100644 (file)
@@ -256,6 +256,60 @@ PlatformResult PipelineManager::RegisterCustomFilter(const std::string& custom_f
 }
 // Pipeline::registerCustomFilter() end
 
+PlatformResult PipelineManager::CustomFilterOutput(const std::string& custom_filter_name,
+                                                   int request_id, int status, int data_id,
+                                                   int callback_id) {
+  ScopeLogger(
+      "custom_filter_name: [%s], request_id: [%d], status: [%d], data_id: [%d], callback_id: [%d]",
+      custom_filter_name.c_str(), request_id, status, data_id, callback_id);
+
+  auto filter_it = custom_filters_.find(custom_filter_name);
+  if (custom_filters_.end() == filter_it) {
+    LoggerE("CustomFilter [%s] not found", custom_filter_name.c_str());
+    return PlatformResult{ErrorCode::ABORT_ERR, "Internal CustomFilter error"};
+  }
+
+  if (CustomFilter::kCustomFilterSuccess != status) {
+    filter_it->second->NotifyAboutJSResponse(request_id, status, callback_id, nullptr);
+    return PlatformResult{};
+  }
+
+  auto* output_from_js_tensors_data = tensors_data_manager_->GetTensorsData(data_id);
+  if (!output_from_js_tensors_data) {
+    LoggerE("Could not get TensorsData: [%d]", data_id);
+    filter_it->second->NotifyAboutJSResponse(request_id, CustomFilter::kCustomFilterError,
+                                             callback_id, nullptr);
+    return PlatformResult{ErrorCode::ABORT_ERR, "Internal CustomFilter error"};
+  }
+
+  auto* output_from_js_tensors_info =
+      tensors_info_manager_->GetTensorsInfo(output_from_js_tensors_data->TensorsInfoId());
+  if (!output_from_js_tensors_info) {
+    LoggerE("Could not get TensorsInfo: [%d]", output_from_js_tensors_data->TensorsInfoId());
+    filter_it->second->NotifyAboutJSResponse(request_id, CustomFilter::kCustomFilterError,
+                                             callback_id, nullptr);
+    return PlatformResult{ErrorCode::ABORT_ERR, "Internal CustomFilter error"};
+  }
+
+  /*
+   * We clone this tensors data to be sure, that the user won't dispose it before it will be cloned
+   * by CustomFilter::CustomFilterListener.
+   */
+  auto* output_from_js_tensors_data_clone = tensors_info_manager_->CloneNativeTensorWithData(
+      output_from_js_tensors_info->Handle(), output_from_js_tensors_data->Handle());
+  if (!output_from_js_tensors_data_clone) {
+    LoggerE("Could not clone TensorsData: [%d] with TensorsInfo: [%d]", data_id,
+            output_from_js_tensors_info->Id());
+    filter_it->second->NotifyAboutJSResponse(request_id, CustomFilter::kCustomFilterError,
+                                             callback_id, nullptr);
+    return PlatformResult{ErrorCode::ABORT_ERR, "Internal CustomFilter error"};
+  }
+
+  filter_it->second->NotifyAboutJSResponse(request_id, status, callback_id,
+                                           output_from_js_tensors_data_clone);
+  return PlatformResult{};
+}
+
 // Pipeline::unregisterCustomFilter() begin
 PlatformResult PipelineManager::UnregisterCustomFilter(const std::string& custom_filter_name) {
   ScopeLogger("custom_filter_name: [%s]", custom_filter_name.c_str());
index e490f82..66c3e14 100644 (file)
@@ -93,6 +93,8 @@ class PipelineManager {
                                       TensorsInfo* input_tensors_info_ptr,
                                       TensorsInfo* output_tensors_info_ptr);
 
+  PlatformResult CustomFilterOutput(const std::string& custom_filter_name, int request_id,
+                                    int status, int data_id, int callback_id);
   // Pipeline::registerCustomFilter() end
 
   // Pipeline::unregisterCustomFilter() begin