From: Pawel Wasowski
Date: Mon, 15 Mar 2021 15:52:07 +0000 (+0100)
Subject: [ML][Pipeline] Prevent deadlock in CustomFilter
X-Git-Tag: submit/tizen/20210319.060422~1^2
X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=565ee26b101852db079f38d566e942bdc70d4503;p=platform%2Fcore%2Fapi%2Fwebapi-plugins.git
[ML][Pipeline] Prevent deadlock in CustomFilter
ACR: TWDAPI-274
A deadlock could happen in 2 scenarios:
1. An attempt of unregistering a CustomFilter from its callback, (i.e.
calling tizen.ml.pipeline.unregisterCustomFilter('xxx') from xxx's
CustomFilter callback).
2. An attempt of disposing the pipeline using a CustomFilter which is
currently processing data.
This commit fixes the problems.
[Verification] Tested in Chrome DevTools with the snippets below, works
fine.
inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]);
outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo("flat", "UINT8", [1200]);
filterCB = function (input, output) {
console.log('hello');
tizen.ml.pipeline.unregisterCustomFilter("flattenFilter");
console.log('bye');
}
retValue = tizen.ml.pipeline.registerCustomFilter("flattenFilter", filterCB,
inputTI, outputTI,
console.warn);
pipelineDefinition = "videotestsrc num-buffers=3 " +
"! video/x-raw,width=20,height=15,format=BGRA " +
"! tensor_converter " +
"! tensor_filter framework=custom-easy model=flattenFilter "
+ "! fakesink";
pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition);
pipeline.start();
// hello
// WebAPIException {name: "AbortError", message:
// "CustomFilter has thrown exception: InvalidStateErr CustomFilter has
// thrown exception: InvalidStateError: The custom filter is processing
// data now. Stop the pipeline to unregister the filter."}
//
///////////////////////////////////////////////////////////////////////////
inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]);
outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo("ti1", "UINT8", [1200]);
customFilter = function (input, output) {
try {
inputTI.dispose();
outputTI.dispose();
pipeline.stop();
pipeline.dispose();
} catch (err) {
console.warn(err);
}
}
tizen.ml.pipeline.registerCustomFilter("flattenFilter", customFilter, inputTI, outputTI);
pipelineDefinition = "videotestsrc num-buffers=3 " +
"! video/x-raw,width=20,height=15,format=BGRA " +
"! tensor_converter " +
"! tensor_filter framework=custom-easy model=flattenFilter " +
"! fakesink";
pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition);
pipeline.start();
// WebAPIException {name: "InvalidStateError",
// message: "Pipeline cannot be disposed when at least one custom filter
// is currently processing data.",
//
///////////////////////////////////////////////////////////////////////////
// Valid CustomFilter callback - the happy scenario
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenAndSet123 = function(input, output) {
console.log("Custom filter called");
var rawOutputData = new Uint8Array(1200);
for (var i = 0; i < rawOutputData.length; ++i) {
rawOutputData[i] = 123;
}
output.setTensorRawData(0, rawOutputData);
return 0;
}
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI,
outputTI, function errorCallback(error) {
console.warn('custom filter error:') ; console.warn(error);
});
var pipeline_def = "videotestsrc num-buffers=3 "
+ "! video/x-raw,width=20,height=15,format=BGRA "
+ "! tensor_converter "
+ "! tensor_filter framework=custom-easy model=testfilter2 "
+ "! appsink name=mysink";
var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
state => {console.log(state);})
pipeline.registerSinkListener('mysink', function(sinkName, data) {
console.log('SinkListener for "' + sinkName + '" sink called');
console.log(data);
})
// READY
// Custom filter called
// PAUSED
pipeline.start()
// PLAYING
//
////////////////////////////////////////////////////////////
// Valid CustomFilter callback - the happy scenario; ignore the data
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input, output) {
console.log("Custom filter called");
return 1; // ignore data
}
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
outputTI, function errorCallback(error) {
console.warn('custom filter error:') ; console.warn(error);
});
var pipeline_def = "videotestsrc num-buffers=3 "
+ "! video/x-raw,width=20,height=15,format=BGRA "
+ "! tensor_converter "
+ "! tensor_filter framework=custom-easy model=testfilter2 "
+ "! appsink name=mysink";
var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
state => {console.log(state);})
pipeline.registerSinkListener('mysink', function(sinkName, data) {
console.log('SinkListener for "' + sinkName + '" sink called');
console.log(data);
})
// READY
// Custom filter called
// Custom filter called
// Custom filter called
// PAUSED
pipeline.start()
// PLAYING
////////////////////////////////////////////////////////////
// Valid CustomFilter callback - CustomFilter returns an error
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input, output) {
console.log("Custom filter called");
return -1;
}
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
outputTI, function errorCallback(error) {
console.warn('custom filter error:') ; console.warn(error);
});
var pipeline_def = "videotestsrc num-buffers=3 "
+ "! video/x-raw,width=20,height=15,format=BGRA "
+ "! tensor_converter "
+ "! tensor_filter framework=custom-easy model=testfilter2 "
+ "! appsink name=mysink";
var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
state => {console.log(state);})
pipeline.registerSinkListener('mysink', function(sinkName, data) {
console.log('SinkListener for "' + sinkName + '" sink called');
console.log(data);
})
// READY
// Custom filter called
// PAUSED
pipeline.start()
// PLAYING
////////////////////////////////////////////////////////////
// Invalid CustomFilterOutput.status
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenPlusOne = function(input) {
console.log("Custom filter called");
return 123;
}
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI,
outputTI, function errorCallback(error) {
console.warn('custom filter error:') ; console.warn(error);
});
var pipeline_def = "videotestsrc num-buffers=3 "
+ "! video/x-raw,width=20,height=15,format=BGRA "
+ "! tensor_converter "
+ "! tensor_filter framework=custom-easy model=testfilter2 "
+ "! appsink name=mysink";
var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
state => {console.log(state);})
pipeline.registerSinkListener('mysink', function(sinkName, data) {
console.log('SinkListener for "' + sinkName + '" sink called');
console.log(data);
})
// InvalidValuesError,
// message: "CustomFilterOutput.status === 1 is the only legal positive value"
////////////////////////////////////////////////////////////
// Check if {input, output}.dispose() and input.setTensorRawData()
// have any effect
var inputTI = new tizen.ml.TensorsInfo();
inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]);
var outputTI = new tizen.ml.TensorsInfo();
outputTI.addTensorInfo('ti1', 'UINT8', [1200]);
var flattenAndSet123 = function(input, output) {
console.log("Custom filter called");
// dispose should have no efect
input.dispose();
console.log('input count: ' + input.tensorsInfo.count);
// dispose should have no efect
input.dispose();
console.log('output count: ' + output.tensorsInfo.count);
var rawOutputData = new Uint8Array(1200);
for (var i = 0; i < rawOutputData.length; ++i) {
rawOutputData[i] = 123;
}
output.setTensorRawData(0, rawOutputData);
// this call should have no effect
input.setTensorRawData(0, rawOutputData);
return 0;
}
tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI,
outputTI, function errorCallback(error) {
console.warn('custom filter error:') ; console.warn(error);
});
var pipeline_def = "videotestsrc num-buffers=3 "
+ "! video/x-raw,width=20,height=15,format=BGRA "
+ "! tensor_converter "
+ "! tensor_filter framework=custom-easy model=testfilter2 "
+ "! appsink name=mysink";
var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def,
state => {console.log(state);})
pipeline.registerSinkListener('mysink', function(sinkName, data) {
console.log('SinkListener for "' + sinkName + '" sink called');
console.log(data);
})
Change-Id: Id6cda7782e3065248b2f2c5f859ca2af07c108a6
Signed-off-by: Pawel Wasowski
---
diff --git a/src/ml/js/ml_pipeline.js b/src/ml/js/ml_pipeline.js
index b7204bf4..a07accde 100755
--- a/src/ml/js/ml_pipeline.js
+++ b/src/ml/js/ml_pipeline.js
@@ -19,7 +19,12 @@ var kSinkListenerNamePrefix = 'MLPipelineSinkListener';
var kCustomFilterListenerNamePrefix = 'MLPipelineCustomFilterListener';
//PipelineManager::createPipeline() begin
-var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError'];
+var ValidPipelineDisposeExceptions = [
+ 'InvalidStateError',
+ 'NotFoundError',
+ 'NotSupportedError',
+ 'AbortError'
+];
var nextPipelineId = 1;
function NextPipelineId() {
@@ -915,6 +920,7 @@ MachineLearningPipeline.prototype.registerCustomFilter = function() {
//Pipeline::unregisterCustomFilter() begin
var ValidUnregisterCustomFilterExceptions = [
+ 'InvalidStateError',
'InvalidValuesError',
'NotSupportedError',
'AbortError'
diff --git a/src/ml/ml_pipeline.cc b/src/ml/ml_pipeline.cc
index 0719a2d9..1bf6b74d 100644
--- a/src/ml/ml_pipeline.cc
+++ b/src/ml/ml_pipeline.cc
@@ -14,10 +14,14 @@
* limitations under the License.
*/
#include
+#include
+#include
+#include
#include "common/logger.h"
#include "common/picojson.h"
#include "ml_pipeline.h"
+#include "ml_pipeline_custom_filter.h"
#include "ml_pipeline_sink.h"
#include "ml_utils.h"
@@ -120,7 +124,28 @@ Pipeline::~Pipeline() {
return;
}
- Dispose();
+ auto disposed_successfully = PlatformResult{ErrorCode::ABORT_ERR};
+
+ /*
+ * Dispose() may fail if at least one CustomFilter is processing
+ * data. We'll make 5 attempts to increase the chance of successful
+ * disposal.
+ * It should succeed for almost all cases, but in principle it may fail.
+ */
+ for (int i = 0; !disposed_successfully && i < 5; ++i) {
+ disposed_successfully = Dispose();
+ if (!disposed_successfully) {
+ LoggerD("Disposal attempt number %d failed", i);
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for(500ms);
+ }
+ }
+
+ if (disposed_successfully) {
+ LoggerD("Successfully disposed the pipeline.");
+ } else {
+ LoggerE("Could not dispose the pipeline.");
+ }
}
void Pipeline::PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data) {
@@ -183,6 +208,31 @@ PlatformResult Pipeline::Stop() {
PlatformResult Pipeline::Dispose() {
ScopeLogger("id_: [%d]", id_);
+ /*
+ * A deadlock (in nnstreamer implementation) happens in the following scenario:
+ * - CustomFilter::CustomFilterListener() is waiting for response from
+ * JS (stages 2-4 of data processing, as described in comments in ml_pipeline_api.js)
+ * - ml_pipeline_destroy() is then called from a different thread (main thread
+ * in JS apps).
+ *
+ * If not prevented here, it could occur when pipeline.dispose() would have
+ * been called from CustomFilter callback in JS.
+ *
+ * To avoid this, we check if any filter is processing data (there's no
+ * easy way to check only those filters, that are used in this pipeline) and
+ * stop them from entering data processing stage for a moment. Only then we let
+ * calling ml_pipeline_destroy().
+ *
+ * When filter_locks will be destroyed, CustomFilter callbacks will resume.
+ */
+ std::vector> filter_locks;
+ auto can_continue_dispose = CustomFilter::TryLockAllFilters(&filter_locks);
+ if (!can_continue_dispose) {
+ return PlatformResult{ErrorCode::INVALID_STATE_ERR,
+ "Pipeline cannot be disposed when at least one custom filter is "
+ "currently processing data."};
+ }
+
/*
* TODO in future commits:
*
diff --git a/src/ml/ml_pipeline_custom_filter.cc b/src/ml/ml_pipeline_custom_filter.cc
index fc54c93d..710481b3 100644
--- a/src/ml/ml_pipeline_custom_filter.cc
+++ b/src/ml/ml_pipeline_custom_filter.cc
@@ -48,6 +48,9 @@ const int CustomFilter::kCustomFilterError = -1;
const int CustomFilter::kCustomFilterIgnoreData = 1;
const int CustomFilter::kCustomFilterSuccess = 0;
+std::mutex CustomFilter::valid_filters_mutex_ = {};
+std::unordered_set CustomFilter::valid_filters_ = {};
+
PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
@@ -116,12 +119,16 @@ CustomFilter::CustomFilter(const std::string& name, const std::string& listener_
instance_ptr_{instance_ptr},
tensors_info_manager_ptr_{tensors_info_manager_ptr},
tensors_data_manager_ptr_{tensors_data_manager_ptr},
+ threads_waiting_for_js_responses_{0},
main_thread_id_{main_thread_id} {
ScopeLogger(
"name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], "
"output_tensors_info::id: [%d]",
name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(),
output_tensors_info_ptr_->Id());
+
+ std::lock_guard lock{CustomFilter::valid_filters_mutex_};
+ CustomFilter::valid_filters_.insert(this);
}
CustomFilter::~CustomFilter() {
@@ -142,6 +149,20 @@ PlatformResult CustomFilter::Unregister() {
return PlatformResult{};
}
+ std::lock_guard valid_filters_lock{CustomFilter::valid_filters_mutex_};
+ /*
+ * We ensure, that no thread is currently waiting for
+ * response - if we'd try to unregister a thread that is currently
+ * sleeping, we could cause a deadlock.
+ */
+ auto request_id_to_js_response_lock =
+ LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+ if (!request_id_to_js_response_lock) {
+ return PlatformResult{
+ ErrorCode::INVALID_STATE_ERR,
+ "The custom filter is processing data now. Stop the pipeline to unregister the filter."};
+ }
+
auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_);
if (ML_ERROR_NONE != ret) {
LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret,
@@ -150,6 +171,8 @@ PlatformResult CustomFilter::Unregister() {
}
LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded");
+ CustomFilter::valid_filters_.erase(this);
+
custom_filter_ = nullptr;
return PlatformResult{};
@@ -242,6 +265,12 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
CustomFilter* custom_filter_ptr = static_cast(user_data);
+ auto lock = CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(custom_filter_ptr);
+ if (!lock) {
+ LoggerD("This custom filter is invalid. Pipeline will stop.");
+ return kCustomFilterError;
+ }
+
picojson::value message{picojson::object{}};
int request_id = -1;
TensorsData *input_tensors_data_ptr = nullptr, *output_tensors_data_ptr = nullptr;
@@ -255,7 +284,6 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
native_input_tensors_data_handle, native_output_tensors_data_handle, &message, &request_id,
&input_tensors_data_ptr, &output_tensors_data_ptr);
- std::unique_lock lock{custom_filter_ptr->request_id_to_js_response_mutex_};
common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
if (!success) {
@@ -263,6 +291,8 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
return kCustomFilterError;
}
+ custom_filter_ptr->threads_waiting_for_js_responses_++;
+
/*
* Stage 1. of data processing (see the comment above customFilterWrapper in
* ml_pipeline.js) ends here.
@@ -279,11 +309,63 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens
*/
auto js_response_status = custom_filter_ptr->request_id_to_js_response_status_[request_id];
custom_filter_ptr->request_id_to_js_response_status_.erase(request_id);
- lock.unlock();
+
+ custom_filter_ptr->threads_waiting_for_js_responses_--;
return js_response_status;
}
+std::unique_lock
+CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses() {
+ ScopeLogger("name_: [%s], listener_name_: [%s]", name_.c_str(), listener_name_.c_str());
+
+ std::unique_lock lock{request_id_to_js_response_mutex_};
+ LoggerD("threads_waiting_for_js_responses_: [%d], lock acquired: [%s]",
+ threads_waiting_for_js_responses_,
+ 0 == threads_waiting_for_js_responses_ ? "true" : "false");
+ if (0 == threads_waiting_for_js_responses_) {
+ return lock;
+ }
+
+ return {};
+}
+
+std::unique_lock CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(
+ CustomFilter* custom_filter_ptr) {
+ ScopeLogger();
+
+ std::lock_guard lock{CustomFilter::valid_filters_mutex_};
+ if (CustomFilter::valid_filters_.count(custom_filter_ptr)) {
+ LoggerD("Filter is valid");
+ return std::unique_lock{custom_filter_ptr->request_id_to_js_response_mutex_};
+ }
+
+ LoggerD("Filter is invalid");
+ return {};
+}
+
+bool CustomFilter::TryLockAllFilters(
+ std::vector>* output_filter_locks) {
+ ScopeLogger();
+
+ std::lock_guard valid_filters_lock{CustomFilter::valid_filters_mutex_};
+
+ for (auto* filter : CustomFilter::valid_filters_) {
+ auto filter_lock = filter->LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+ if (filter_lock) {
+ LoggerD("[%s] CustomFilter locked", filter->name_.c_str());
+ output_filter_locks->push_back(std::move(filter_lock));
+ } else {
+ LoggerD("[%s] CustomFilter is waiting for JS responses and cannot be locked",
+ filter->name_.c_str());
+ output_filter_locks->clear();
+ return false;
+ }
+ }
+
+ return true;
+}
+
} // namespace pipeline
} // namespace ml
} // namespace extension
\ No newline at end of file
diff --git a/src/ml/ml_pipeline_custom_filter.h b/src/ml/ml_pipeline_custom_filter.h
index 5210fd4a..33f9d21d 100644
--- a/src/ml/ml_pipeline_custom_filter.h
+++ b/src/ml/ml_pipeline_custom_filter.h
@@ -64,6 +64,16 @@ class CustomFilter {
static const int kCustomFilterIgnoreData;
static const int kCustomFilterSuccess;
+ /*
+ * If any filter is currently processing data, this function will
+ * return "false" and an empty vector pointed by output_filter_locks.
+ * If no filter is currently processing data, this function will return
+ * "true" and will lock request_id_to_js_response_mutex_ of each
+ * valid custom filter to prevent from executing JS code from
+ * within JS custom filter callbacks.
+ */
+ static bool TryLockAllFilters(std::vector>* output_filter_locks);
+
private:
CustomFilter(const std::string& name, const std::string& listener_name,
TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
@@ -85,6 +95,14 @@ class CustomFilter {
int getRequestId();
+ /*
+ * If no CustomFilter thread is sleeping, waiting for responses from JS,
+ * this function will return locked request_id_to_js_response_mutex_,
+ * thus preventing CustomFilterListener called in newly spawned threads
+ * from incrementing threads_waiting_for_js_responses_.
+ */
+ std::unique_lock LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+
const std::string name_;
const std::string listener_name_;
TensorsInfo* input_tensors_info_ptr_;
@@ -96,8 +114,22 @@ class CustomFilter {
std::mutex request_id_to_js_response_mutex_;
std::unordered_map request_id_to_js_response_status_;
- std::condition_variable cv_;
+ /*
+ * We keep track of the number of threads waiting for JS responses
+ * because Unregister() MUSTN'T be called when at least one thread
+ * is sleeping (calling it could result in a deadlock).
+ *
+ * This variable should only be accessed by a thread that currently
+ * locks request_id_to_js_response_mutex_.
+ */
+ int threads_waiting_for_js_responses_;
+ std::condition_variable cv_; // used with request_id_to_js_response_mutex_
+
std::thread::id main_thread_id_;
+
+ static std::unique_lock LockRequestIdToJSResponseMutexIfFilterIsValid(CustomFilter*);
+ static std::mutex valid_filters_mutex_;
+ static std::unordered_set valid_filters_;
};
} // namespace pipeline