[ML][Pipeline] Prevent deadlock in CustomFilter 44/255244/8
authorPawel Wasowski <p.wasowski2@samsung.com>
Mon, 15 Mar 2021 15:52:07 +0000 (16:52 +0100)
committerPawel Wasowski <p.wasowski2@samsung.com>
Thu, 18 Mar 2021 08:27:47 +0000 (08:27 +0000)
ACR: TWDAPI-274

A deadlock could happen in 2 scenarios:
1. An attempt of unregistering a CustomFilter from its callback, (i.e.
calling tizen.ml.pipeline.unregisterCustomFilter('xxx') from xxx's
CustomFilter callback).
2. An attempt of disposing the pipeline using a CustomFilter which is
currently processing data.
This commit fixes the problems.

[Verification] Tested in Chrome DevTools with the snippets below, works
fine.

inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]);
outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo("flat", "UINT8", [1200]);

filterCB = function (input, output) {
  console.log('hello');
  tizen.ml.pipeline.unregisterCustomFilter("flattenFilter");
  console.log('bye');
}

retValue = tizen.ml.pipeline.registerCustomFilter("flattenFilter", filterCB,
                                                  inputTI, outputTI,
                                                  console.warn);

pipelineDefinition = "videotestsrc num-buffers=3 " +
                    "! video/x-raw,width=20,height=15,format=BGRA " +
                    "! tensor_converter " +
                    "! tensor_filter framework=custom-easy model=flattenFilter "
                    + "! fakesink";
pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition);
pipeline.start();

// hello
// WebAPIException {name: "AbortError", message:
// "CustomFilter has thrown exception: InvalidStateErr CustomFilter has
// thrown exception: InvalidStateError: The custom filter is processing
// data now. Stop the pipeline to unregister the filter."}

// <no deadlock>

///////////////////////////////////////////////////////////////////////////
inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]);
outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo("ti1", "UINT8", [1200]);

customFilter = function (input, output) {
  try {
      inputTI.dispose();
      outputTI.dispose();
      pipeline.stop();
      pipeline.dispose();
  } catch (err) {
      console.warn(err);
  }
}

tizen.ml.pipeline.registerCustomFilter("flattenFilter", customFilter, inputTI, outputTI);

pipelineDefinition = "videotestsrc num-buffers=3 " +
                    "! video/x-raw,width=20,height=15,format=BGRA " +
                    "! tensor_converter " +
                    "! tensor_filter framework=custom-easy model=flattenFilter " +
                    "! fakesink";
pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition);
pipeline.start();

// WebAPIException {name: "InvalidStateError",
// message: "Pipeline cannot be disposed when at least one custom filter
// is currently processing data.",

// <no deadlock>

///////////////////////////////////////////////////////////////////////////
// Valid CustomFilter callback - the happy scenario
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenAndSet123 = function(input, output) {
    console.log("Custom filter called");

    var rawOutputData = new Uint8Array(1200);
    for (var i = 0; i < rawOutputData.length; ++i) {
        rawOutputData[i] = 123;
    }

    output.setTensorRawData(0, rawOutputData);
    return 0;
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING
// <CustomFilter and SinkListener callbacks' outputs 3 times>

////////////////////////////////////////////////////////////

// Valid CustomFilter callback - the happy scenario; ignore the data

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input, output) {
    console.log("Custom filter called");
        return 1; // ignore data
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// Custom filter called
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING

////////////////////////////////////////////////////////////

// Valid CustomFilter callback - CustomFilter returns an error

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input, output) {
    console.log("Custom filter called");
        return -1;
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// READY
// Custom filter called
// PAUSED

pipeline.start()

// PLAYING

////////////////////////////////////////////////////////////

// Invalid CustomFilterOutput.status
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
    console.log("Custom filter called");

    return 123;
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

// InvalidValuesError,
//  message: "CustomFilterOutput.status === 1 is the only legal positive value"

////////////////////////////////////////////////////////////
// Check if {input, output}.dispose() and input.setTensorRawData()
// have any effect

var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenAndSet123 = function(input, output) {
    console.log("Custom filter called");

    // dispose should have no efect
    input.dispose();
    console.log('input count: ' + input.tensorsInfo.count);
    // dispose should have no efect
    input.dispose();
    console.log('output count: ' + output.tensorsInfo.count);

    var rawOutputData = new Uint8Array(1200);
    for (var i = 0; i < rawOutputData.length; ++i) {
        rawOutputData[i] = 123;
    }

    output.setTensorRawData(0, rawOutputData);

    // this call should have no effect
    input.setTensorRawData(0, rawOutputData);

    return 0;
}

tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI,
    outputTI, function errorCallback(error) {
        console.warn('custom filter error:') ; console.warn(error);
    });

var pipeline_def = "videotestsrc num-buffers=3 "
                   + "! video/x-raw,width=20,height=15,format=BGRA "
                   + "! tensor_converter "
                   + "! tensor_filter framework=custom-easy model=testfilter2 "
                   + "! appsink name=mysink";

var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
                                                state => {console.log(state);})

pipeline.registerSinkListener('mysink', function(sinkName, data) {
    console.log('SinkListener for "' + sinkName + '" sink called');
    console.log(data);
})

Change-Id: Id6cda7782e3065248b2f2c5f859ca2af07c108a6
Signed-off-by: Pawel Wasowski <p.wasowski2@samsung.com>
src/ml/js/ml_pipeline.js
src/ml/ml_pipeline.cc
src/ml/ml_pipeline_custom_filter.cc
src/ml/ml_pipeline_custom_filter.h

index b7204bf..a07accd 100755 (executable)
@@ -19,7 +19,12 @@ var kSinkListenerNamePrefix = 'MLPipelineSinkListener';
 var kCustomFilterListenerNamePrefix = 'MLPipelineCustomFilterListener';
 
 //PipelineManager::createPipeline() begin
-var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError'];
+var ValidPipelineDisposeExceptions = [
+    'InvalidStateError',
+    'NotFoundError',
+    'NotSupportedError',
+    'AbortError'
+];
 
 var nextPipelineId = 1;
 function NextPipelineId() {
@@ -915,6 +920,7 @@ MachineLearningPipeline.prototype.registerCustomFilter = function() {
 
 //Pipeline::unregisterCustomFilter() begin
 var ValidUnregisterCustomFilterExceptions = [
+    'InvalidStateError',
     'InvalidValuesError',
     'NotSupportedError',
     'AbortError'
index 0719a2d..1bf6b74 100644 (file)
  *    limitations under the License.
  */
 #include <tizen.h>
+#include <chrono>
+#include <mutex>
+#include <thread>
 
 #include "common/logger.h"
 #include "common/picojson.h"
 #include "ml_pipeline.h"
+#include "ml_pipeline_custom_filter.h"
 #include "ml_pipeline_sink.h"
 #include "ml_utils.h"
 
@@ -120,7 +124,28 @@ Pipeline::~Pipeline() {
     return;
   }
 
-  Dispose();
+  auto disposed_successfully = PlatformResult{ErrorCode::ABORT_ERR};
+
+  /*
+   * Dispose() may fail if at least one CustomFilter is processing
+   * data. We'll make 5 attempts to increase the chance of successful
+   * disposal.
+   * It should succeed for almost all cases, but in principle it may fail.
+   */
+  for (int i = 0; !disposed_successfully && i < 5; ++i) {
+    disposed_successfully = Dispose();
+    if (!disposed_successfully) {
+      LoggerD("Disposal attempt number %d failed", i);
+      using namespace std::chrono_literals;
+      std::this_thread::sleep_for(500ms);
+    }
+  }
+
+  if (disposed_successfully) {
+    LoggerD("Successfully disposed the pipeline.");
+  } else {
+    LoggerE("Could not dispose the pipeline.");
+  }
 }
 
 void Pipeline::PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data) {
@@ -184,6 +209,31 @@ PlatformResult Pipeline::Dispose() {
   ScopeLogger("id_: [%d]", id_);
 
   /*
+   * A deadlock (in nnstreamer implementation) happens in the following scenario:
+   * - CustomFilter::CustomFilterListener() is waiting for response from
+   * JS (stages 2-4 of data processing, as described in comments in ml_pipeline_api.js)
+   * - ml_pipeline_destroy() is then called from a different thread (main thread
+   * in JS apps).
+   *
+   * If not prevented here, it could occur when pipeline.dispose() would have
+   * been called from CustomFilter callback in JS.
+   *
+   * To avoid this, we check if any filter is processing data (there's no
+   * easy way to check only those filters, that are used in this pipeline) and
+   * stop them from entering data processing stage for a moment. Only then we let
+   * calling ml_pipeline_destroy().
+   *
+   * When filter_locks will be destroyed, CustomFilter callbacks will resume.
+   */
+  std::vector<std::unique_lock<std::mutex>> filter_locks;
+  auto can_continue_dispose = CustomFilter::TryLockAllFilters(&filter_locks);
+  if (!can_continue_dispose) {
+    return PlatformResult{ErrorCode::INVALID_STATE_ERR,
+                          "Pipeline cannot be disposed when at least one custom filter is "
+                          "currently processing data."};
+  }
+
+  /*
    * TODO in future commits:
    *
    * Release all nodes belonging to this pipeline and
index fc54c93..710481b 100644 (file)
@@ -48,6 +48,9 @@ const int CustomFilter::kCustomFilterError = -1;
 const int CustomFilter::kCustomFilterIgnoreData = 1;
 const int CustomFilter::kCustomFilterSuccess = 0;
 
+std::mutex CustomFilter::valid_filters_mutex_ = {};
+std::unordered_set<CustomFilter*> CustomFilter::valid_filters_ = {};
+
 PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
     const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
     TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
@@ -116,12 +119,16 @@ CustomFilter::CustomFilter(const std::string& name, const std::string& listener_
       instance_ptr_{instance_ptr},
       tensors_info_manager_ptr_{tensors_info_manager_ptr},
       tensors_data_manager_ptr_{tensors_data_manager_ptr},
+      threads_waiting_for_js_responses_{0},
       main_thread_id_{main_thread_id} {
   ScopeLogger(
       "name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], "
       "output_tensors_info::id: [%d]",
       name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(),
       output_tensors_info_ptr_->Id());
+
+  std::lock_guard<std::mutex> lock{CustomFilter::valid_filters_mutex_};
+  CustomFilter::valid_filters_.insert(this);
 }
 
 CustomFilter::~CustomFilter() {
@@ -142,6 +149,20 @@ PlatformResult CustomFilter::Unregister() {
     return PlatformResult{};
   }
 
+  std::lock_guard<std::mutex> valid_filters_lock{CustomFilter::valid_filters_mutex_};
+  /*
+   * We ensure, that no thread is currently waiting for
+   * response - if we'd try to unregister a thread that is currently
+   * sleeping, we could cause a deadlock.
+   */
+  auto request_id_to_js_response_lock =
+      LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+  if (!request_id_to_js_response_lock) {
+    return PlatformResult{
+        ErrorCode::INVALID_STATE_ERR,
+        "The custom filter is processing data now. Stop the pipeline to unregister the filter."};
+  }
+
   auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_);
   if (ML_ERROR_NONE != ret) {
     LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret,
@@ -150,6 +171,8 @@ PlatformResult CustomFilter::Unregister() {
   }
   LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded");
 
+  CustomFilter::valid_filters_.erase(this);
+
   custom_filter_ = nullptr;
 
   return PlatformResult{};
@@ -242,6 +265,12 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
 
   CustomFilter* custom_filter_ptr = static_cast<CustomFilter*>(user_data);
 
+  auto lock = CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(custom_filter_ptr);
+  if (!lock) {
+    LoggerD("This custom filter is invalid. Pipeline will stop.");
+    return kCustomFilterError;
+  }
+
   picojson::value message{picojson::object{}};
   int request_id = -1;
   TensorsData *input_tensors_data_ptr = nullptr, *output_tensors_data_ptr = nullptr;
@@ -255,7 +284,6 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
       native_input_tensors_data_handle, native_output_tensors_data_handle, &message, &request_id,
       &input_tensors_data_ptr, &output_tensors_data_ptr);
 
-  std::unique_lock<std::mutex> lock{custom_filter_ptr->request_id_to_js_response_mutex_};
   common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
 
   if (!success) {
@@ -263,6 +291,8 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
     return kCustomFilterError;
   }
 
+  custom_filter_ptr->threads_waiting_for_js_responses_++;
+
   /*
    * Stage 1. of data processing (see the comment above customFilterWrapper in
    * ml_pipeline.js) ends here.
@@ -279,11 +309,63 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
    */
   auto js_response_status = custom_filter_ptr->request_id_to_js_response_status_[request_id];
   custom_filter_ptr->request_id_to_js_response_status_.erase(request_id);
-  lock.unlock();
+
+  custom_filter_ptr->threads_waiting_for_js_responses_--;
 
   return js_response_status;
 }
 
+std::unique_lock<std::mutex>
+CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses() {
+  ScopeLogger("name_: [%s], listener_name_: [%s]", name_.c_str(), listener_name_.c_str());
+
+  std::unique_lock<std::mutex> lock{request_id_to_js_response_mutex_};
+  LoggerD("threads_waiting_for_js_responses_: [%d], lock acquired: [%s]",
+          threads_waiting_for_js_responses_,
+          0 == threads_waiting_for_js_responses_ ? "true" : "false");
+  if (0 == threads_waiting_for_js_responses_) {
+    return lock;
+  }
+
+  return {};
+}
+
+std::unique_lock<std::mutex> CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(
+    CustomFilter* custom_filter_ptr) {
+  ScopeLogger();
+
+  std::lock_guard<std::mutex> lock{CustomFilter::valid_filters_mutex_};
+  if (CustomFilter::valid_filters_.count(custom_filter_ptr)) {
+    LoggerD("Filter is valid");
+    return std::unique_lock<std::mutex>{custom_filter_ptr->request_id_to_js_response_mutex_};
+  }
+
+  LoggerD("Filter is invalid");
+  return {};
+}
+
+bool CustomFilter::TryLockAllFilters(
+    std::vector<std::unique_lock<std::mutex>>* output_filter_locks) {
+  ScopeLogger();
+
+  std::lock_guard<std::mutex> valid_filters_lock{CustomFilter::valid_filters_mutex_};
+
+  for (auto* filter : CustomFilter::valid_filters_) {
+    auto filter_lock = filter->LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+    if (filter_lock) {
+      LoggerD("[%s] CustomFilter locked", filter->name_.c_str());
+      output_filter_locks->push_back(std::move(filter_lock));
+    } else {
+      LoggerD("[%s] CustomFilter is waiting for JS responses and cannot be locked",
+              filter->name_.c_str());
+      output_filter_locks->clear();
+      return false;
+    }
+  }
+
+  return true;
+}
+
 }  // namespace pipeline
 }  // namespace ml
 }  // namespace extension
\ No newline at end of file
index 5210fd4..33f9d21 100644 (file)
@@ -64,6 +64,16 @@ class CustomFilter {
   static const int kCustomFilterIgnoreData;
   static const int kCustomFilterSuccess;
 
+  /*
+   * If any filter is currently processing data, this function will
+   * return "false" and an empty vector pointed by output_filter_locks.
+   * If no filter is currently processing data, this function will return
+   * "true" and will lock request_id_to_js_response_mutex_ of each
+   * valid custom filter to prevent from executing JS code from
+   * within JS custom filter callbacks.
+   */
+  static bool TryLockAllFilters(std::vector<std::unique_lock<std::mutex>>* output_filter_locks);
+
  private:
   CustomFilter(const std::string& name, const std::string& listener_name,
                TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
@@ -85,6 +95,14 @@ class CustomFilter {
 
   int getRequestId();
 
+  /*
+   * If no CustomFilter thread is sleeping, waiting for responses from JS,
+   * this function will return locked request_id_to_js_response_mutex_,
+   * thus preventing CustomFilterListener called in newly spawned threads
+   * from incrementing threads_waiting_for_js_responses_.
+   */
+  std::unique_lock<std::mutex> LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+
   const std::string name_;
   const std::string listener_name_;
   TensorsInfo* input_tensors_info_ptr_;
@@ -96,8 +114,22 @@ class CustomFilter {
 
   std::mutex request_id_to_js_response_mutex_;
   std::unordered_map<int, int> request_id_to_js_response_status_;
-  std::condition_variable cv_;
+  /*
+   * We keep track of the number of threads waiting for JS responses
+   * because Unregister() MUSTN'T be called when at least one thread
+   * is sleeping (calling it could result in a deadlock).
+   *
+   * This variable should only be accessed by a thread that currently
+   * locks request_id_to_js_response_mutex_.
+   */
+  int threads_waiting_for_js_responses_;
+  std::condition_variable cv_;  // used with request_id_to_js_response_mutex_
+
   std::thread::id main_thread_id_;
+
+  static std::unique_lock<std::mutex> LockRequestIdToJSResponseMutexIfFilterIsValid(CustomFilter*);
+  static std::mutex valid_filters_mutex_;
+  static std::unordered_set<CustomFilter*> valid_filters_;
 };
 
 }  // namespace pipeline