From: Pawel Wasowski Date: Mon, 15 Mar 2021 15:52:07 +0000 (+0100) Subject: [ML][Pipeline] Prevent deadlock in CustomFilter X-Git-Tag: submit/tizen/20210319.060422~1^2 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=565ee26b101852db079f38d566e942bdc70d4503;p=platform%2Fcore%2Fapi%2Fwebapi-plugins.git [ML][Pipeline] Prevent deadlock in CustomFilter ACR: TWDAPI-274 A deadlock could happen in 2 scenarios: 1. An attempt of unregistering a CustomFilter from its callback, (i.e. calling tizen.ml.pipeline.unregisterCustomFilter('xxx') from xxx's CustomFilter callback). 2. An attempt of disposing the pipeline using a CustomFilter which is currently processing data. This commit fixes the problems. [Verification] Tested in Chrome DevTools with the snippets below, works fine. inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]); outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo("flat", "UINT8", [1200]); filterCB = function (input, output) { console.log('hello'); tizen.ml.pipeline.unregisterCustomFilter("flattenFilter"); console.log('bye'); } retValue = tizen.ml.pipeline.registerCustomFilter("flattenFilter", filterCB, inputTI, outputTI, console.warn); pipelineDefinition = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=flattenFilter " + "! fakesink"; pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition); pipeline.start(); // hello // WebAPIException {name: "AbortError", message: // "CustomFilter has thrown exception: InvalidStateErr CustomFilter has // thrown exception: InvalidStateError: The custom filter is processing // data now. Stop the pipeline to unregister the filter."} // /////////////////////////////////////////////////////////////////////////// inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo("3D", "UINT8", [4, 20, 15, 1]); outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo("ti1", "UINT8", [1200]); customFilter = function (input, output) { try { inputTI.dispose(); outputTI.dispose(); pipeline.stop(); pipeline.dispose(); } catch (err) { console.warn(err); } } tizen.ml.pipeline.registerCustomFilter("flattenFilter", customFilter, inputTI, outputTI); pipelineDefinition = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=flattenFilter " + "! fakesink"; pipeline = tizen.ml.pipeline.createPipeline(pipelineDefinition); pipeline.start(); // WebAPIException {name: "InvalidStateError", // message: "Pipeline cannot be disposed when at least one custom filter // is currently processing data.", // /////////////////////////////////////////////////////////////////////////// // Valid CustomFilter callback - the happy scenario var inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]); var outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo('ti1', 'UINT8', [1200]); var flattenAndSet123 = function(input, output) { console.log("Custom filter called"); var rawOutputData = new Uint8Array(1200); for (var i = 0; i < rawOutputData.length; ++i) { rawOutputData[i] = 123; } output.setTensorRawData(0, rawOutputData); return 0; } tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI, outputTI, function errorCallback(error) { console.warn('custom filter error:') ; console.warn(error); }); var pipeline_def = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=testfilter2 " + "! appsink name=mysink"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, state => {console.log(state);}) pipeline.registerSinkListener('mysink', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) // READY // Custom filter called // PAUSED pipeline.start() // PLAYING // //////////////////////////////////////////////////////////// // Valid CustomFilter callback - the happy scenario; ignore the data var inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]); var outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo('ti1', 'UINT8', [1200]); var flattenPlusOne = function(input, output) { console.log("Custom filter called"); return 1; // ignore data } tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI, outputTI, function errorCallback(error) { console.warn('custom filter error:') ; console.warn(error); }); var pipeline_def = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=testfilter2 " + "! appsink name=mysink"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, state => {console.log(state);}) pipeline.registerSinkListener('mysink', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) // READY // Custom filter called // Custom filter called // Custom filter called // PAUSED pipeline.start() // PLAYING //////////////////////////////////////////////////////////// // Valid CustomFilter callback - CustomFilter returns an error var inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]); var outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo('ti1', 'UINT8', [1200]); var flattenPlusOne = function(input, output) { console.log("Custom filter called"); return -1; } tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI, outputTI, function errorCallback(error) { console.warn('custom filter error:') ; console.warn(error); }); var pipeline_def = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=testfilter2 " + "! appsink name=mysink"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, state => {console.log(state);}) pipeline.registerSinkListener('mysink', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) // READY // Custom filter called // PAUSED pipeline.start() // PLAYING //////////////////////////////////////////////////////////// // Invalid CustomFilterOutput.status var inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]); var outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo('ti1', 'UINT8', [1200]); var flattenPlusOne = function(input) { console.log("Custom filter called"); return 123; } tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenPlusOne, inputTI, outputTI, function errorCallback(error) { console.warn('custom filter error:') ; console.warn(error); }); var pipeline_def = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=testfilter2 " + "! appsink name=mysink"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, state => {console.log(state);}) pipeline.registerSinkListener('mysink', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) // InvalidValuesError, // message: "CustomFilterOutput.status === 1 is the only legal positive value" //////////////////////////////////////////////////////////// // Check if {input, output}.dispose() and input.setTensorRawData() // have any effect var inputTI = new tizen.ml.TensorsInfo(); inputTI.addTensorInfo('ti1', 'UINT8', [4, 20, 15, 1]); var outputTI = new tizen.ml.TensorsInfo(); outputTI.addTensorInfo('ti1', 'UINT8', [1200]); var flattenAndSet123 = function(input, output) { console.log("Custom filter called"); // dispose should have no efect input.dispose(); console.log('input count: ' + input.tensorsInfo.count); // dispose should have no efect input.dispose(); console.log('output count: ' + output.tensorsInfo.count); var rawOutputData = new Uint8Array(1200); for (var i = 0; i < rawOutputData.length; ++i) { rawOutputData[i] = 123; } output.setTensorRawData(0, rawOutputData); // this call should have no effect input.setTensorRawData(0, rawOutputData); return 0; } tizen.ml.pipeline.registerCustomFilter('testfilter2', flattenAndSet123, inputTI, outputTI, function errorCallback(error) { console.warn('custom filter error:') ; console.warn(error); }); var pipeline_def = "videotestsrc num-buffers=3 " + "! video/x-raw,width=20,height=15,format=BGRA " + "! tensor_converter " + "! tensor_filter framework=custom-easy model=testfilter2 " + "! appsink name=mysink"; var pipeline = tizen.ml.pipeline.createPipeline(pipeline_def, state => {console.log(state);}) pipeline.registerSinkListener('mysink', function(sinkName, data) { console.log('SinkListener for "' + sinkName + '" sink called'); console.log(data); }) Change-Id: Id6cda7782e3065248b2f2c5f859ca2af07c108a6 Signed-off-by: Pawel Wasowski --- diff --git a/src/ml/js/ml_pipeline.js b/src/ml/js/ml_pipeline.js index b7204bf4..a07accde 100755 --- a/src/ml/js/ml_pipeline.js +++ b/src/ml/js/ml_pipeline.js @@ -19,7 +19,12 @@ var kSinkListenerNamePrefix = 'MLPipelineSinkListener'; var kCustomFilterListenerNamePrefix = 'MLPipelineCustomFilterListener'; //PipelineManager::createPipeline() begin -var ValidPipelineDisposeExceptions = ['NotFoundError', 'NotSupportedError', 'AbortError']; +var ValidPipelineDisposeExceptions = [ + 'InvalidStateError', + 'NotFoundError', + 'NotSupportedError', + 'AbortError' +]; var nextPipelineId = 1; function NextPipelineId() { @@ -915,6 +920,7 @@ MachineLearningPipeline.prototype.registerCustomFilter = function() { //Pipeline::unregisterCustomFilter() begin var ValidUnregisterCustomFilterExceptions = [ + 'InvalidStateError', 'InvalidValuesError', 'NotSupportedError', 'AbortError' diff --git a/src/ml/ml_pipeline.cc b/src/ml/ml_pipeline.cc index 0719a2d9..1bf6b74d 100644 --- a/src/ml/ml_pipeline.cc +++ b/src/ml/ml_pipeline.cc @@ -14,10 +14,14 @@ * limitations under the License. */ #include +#include +#include +#include #include "common/logger.h" #include "common/picojson.h" #include "ml_pipeline.h" +#include "ml_pipeline_custom_filter.h" #include "ml_pipeline_sink.h" #include "ml_utils.h" @@ -120,7 +124,28 @@ Pipeline::~Pipeline() { return; } - Dispose(); + auto disposed_successfully = PlatformResult{ErrorCode::ABORT_ERR}; + + /* + * Dispose() may fail if at least one CustomFilter is processing + * data. We'll make 5 attempts to increase the chance of successful + * disposal. + * It should succeed for almost all cases, but in principle it may fail. + */ + for (int i = 0; !disposed_successfully && i < 5; ++i) { + disposed_successfully = Dispose(); + if (!disposed_successfully) { + LoggerD("Disposal attempt number %d failed", i); + using namespace std::chrono_literals; + std::this_thread::sleep_for(500ms); + } + } + + if (disposed_successfully) { + LoggerD("Successfully disposed the pipeline."); + } else { + LoggerE("Could not dispose the pipeline."); + } } void Pipeline::PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data) { @@ -183,6 +208,31 @@ PlatformResult Pipeline::Stop() { PlatformResult Pipeline::Dispose() { ScopeLogger("id_: [%d]", id_); + /* + * A deadlock (in nnstreamer implementation) happens in the following scenario: + * - CustomFilter::CustomFilterListener() is waiting for response from + * JS (stages 2-4 of data processing, as described in comments in ml_pipeline_api.js) + * - ml_pipeline_destroy() is then called from a different thread (main thread + * in JS apps). + * + * If not prevented here, it could occur when pipeline.dispose() would have + * been called from CustomFilter callback in JS. + * + * To avoid this, we check if any filter is processing data (there's no + * easy way to check only those filters, that are used in this pipeline) and + * stop them from entering data processing stage for a moment. Only then we let + * calling ml_pipeline_destroy(). + * + * When filter_locks will be destroyed, CustomFilter callbacks will resume. + */ + std::vector> filter_locks; + auto can_continue_dispose = CustomFilter::TryLockAllFilters(&filter_locks); + if (!can_continue_dispose) { + return PlatformResult{ErrorCode::INVALID_STATE_ERR, + "Pipeline cannot be disposed when at least one custom filter is " + "currently processing data."}; + } + /* * TODO in future commits: * diff --git a/src/ml/ml_pipeline_custom_filter.cc b/src/ml/ml_pipeline_custom_filter.cc index fc54c93d..710481b3 100644 --- a/src/ml/ml_pipeline_custom_filter.cc +++ b/src/ml/ml_pipeline_custom_filter.cc @@ -48,6 +48,9 @@ const int CustomFilter::kCustomFilterError = -1; const int CustomFilter::kCustomFilterIgnoreData = 1; const int CustomFilter::kCustomFilterSuccess = 0; +std::mutex CustomFilter::valid_filters_mutex_ = {}; +std::unordered_set CustomFilter::valid_filters_ = {}; + PlatformResult CustomFilter::CreateAndRegisterCustomFilter( const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr, TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr, @@ -116,12 +119,16 @@ CustomFilter::CustomFilter(const std::string& name, const std::string& listener_ instance_ptr_{instance_ptr}, tensors_info_manager_ptr_{tensors_info_manager_ptr}, tensors_data_manager_ptr_{tensors_data_manager_ptr}, + threads_waiting_for_js_responses_{0}, main_thread_id_{main_thread_id} { ScopeLogger( "name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], " "output_tensors_info::id: [%d]", name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(), output_tensors_info_ptr_->Id()); + + std::lock_guard lock{CustomFilter::valid_filters_mutex_}; + CustomFilter::valid_filters_.insert(this); } CustomFilter::~CustomFilter() { @@ -142,6 +149,20 @@ PlatformResult CustomFilter::Unregister() { return PlatformResult{}; } + std::lock_guard valid_filters_lock{CustomFilter::valid_filters_mutex_}; + /* + * We ensure, that no thread is currently waiting for + * response - if we'd try to unregister a thread that is currently + * sleeping, we could cause a deadlock. + */ + auto request_id_to_js_response_lock = + LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses(); + if (!request_id_to_js_response_lock) { + return PlatformResult{ + ErrorCode::INVALID_STATE_ERR, + "The custom filter is processing data now. Stop the pipeline to unregister the filter."}; + } + auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_); if (ML_ERROR_NONE != ret) { LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret, @@ -150,6 +171,8 @@ PlatformResult CustomFilter::Unregister() { } LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded"); + CustomFilter::valid_filters_.erase(this); + custom_filter_ = nullptr; return PlatformResult{}; @@ -242,6 +265,12 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens CustomFilter* custom_filter_ptr = static_cast(user_data); + auto lock = CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(custom_filter_ptr); + if (!lock) { + LoggerD("This custom filter is invalid. Pipeline will stop."); + return kCustomFilterError; + } + picojson::value message{picojson::object{}}; int request_id = -1; TensorsData *input_tensors_data_ptr = nullptr, *output_tensors_data_ptr = nullptr; @@ -255,7 +284,6 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens native_input_tensors_data_handle, native_output_tensors_data_handle, &message, &request_id, &input_tensors_data_ptr, &output_tensors_data_ptr); - std::unique_lock lock{custom_filter_ptr->request_id_to_js_response_mutex_}; common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message); if (!success) { @@ -263,6 +291,8 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens return kCustomFilterError; } + custom_filter_ptr->threads_waiting_for_js_responses_++; + /* * Stage 1. of data processing (see the comment above customFilterWrapper in * ml_pipeline.js) ends here. @@ -279,11 +309,63 @@ int CustomFilter::CustomFilterListener(const ml_tensors_data_h native_input_tens */ auto js_response_status = custom_filter_ptr->request_id_to_js_response_status_[request_id]; custom_filter_ptr->request_id_to_js_response_status_.erase(request_id); - lock.unlock(); + + custom_filter_ptr->threads_waiting_for_js_responses_--; return js_response_status; } +std::unique_lock +CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses() { + ScopeLogger("name_: [%s], listener_name_: [%s]", name_.c_str(), listener_name_.c_str()); + + std::unique_lock lock{request_id_to_js_response_mutex_}; + LoggerD("threads_waiting_for_js_responses_: [%d], lock acquired: [%s]", + threads_waiting_for_js_responses_, + 0 == threads_waiting_for_js_responses_ ? "true" : "false"); + if (0 == threads_waiting_for_js_responses_) { + return lock; + } + + return {}; +} + +std::unique_lock CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid( + CustomFilter* custom_filter_ptr) { + ScopeLogger(); + + std::lock_guard lock{CustomFilter::valid_filters_mutex_}; + if (CustomFilter::valid_filters_.count(custom_filter_ptr)) { + LoggerD("Filter is valid"); + return std::unique_lock{custom_filter_ptr->request_id_to_js_response_mutex_}; + } + + LoggerD("Filter is invalid"); + return {}; +} + +bool CustomFilter::TryLockAllFilters( + std::vector>* output_filter_locks) { + ScopeLogger(); + + std::lock_guard valid_filters_lock{CustomFilter::valid_filters_mutex_}; + + for (auto* filter : CustomFilter::valid_filters_) { + auto filter_lock = filter->LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses(); + if (filter_lock) { + LoggerD("[%s] CustomFilter locked", filter->name_.c_str()); + output_filter_locks->push_back(std::move(filter_lock)); + } else { + LoggerD("[%s] CustomFilter is waiting for JS responses and cannot be locked", + filter->name_.c_str()); + output_filter_locks->clear(); + return false; + } + } + + return true; +} + } // namespace pipeline } // namespace ml } // namespace extension \ No newline at end of file diff --git a/src/ml/ml_pipeline_custom_filter.h b/src/ml/ml_pipeline_custom_filter.h index 5210fd4a..33f9d21d 100644 --- a/src/ml/ml_pipeline_custom_filter.h +++ b/src/ml/ml_pipeline_custom_filter.h @@ -64,6 +64,16 @@ class CustomFilter { static const int kCustomFilterIgnoreData; static const int kCustomFilterSuccess; + /* + * If any filter is currently processing data, this function will + * return "false" and an empty vector pointed by output_filter_locks. + * If no filter is currently processing data, this function will return + * "true" and will lock request_id_to_js_response_mutex_ of each + * valid custom filter to prevent from executing JS code from + * within JS custom filter callbacks. + */ + static bool TryLockAllFilters(std::vector>* output_filter_locks); + private: CustomFilter(const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info, @@ -85,6 +95,14 @@ class CustomFilter { int getRequestId(); + /* + * If no CustomFilter thread is sleeping, waiting for responses from JS, + * this function will return locked request_id_to_js_response_mutex_, + * thus preventing CustomFilterListener called in newly spawned threads + * from incrementing threads_waiting_for_js_responses_. + */ + std::unique_lock LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses(); + const std::string name_; const std::string listener_name_; TensorsInfo* input_tensors_info_ptr_; @@ -96,8 +114,22 @@ class CustomFilter { std::mutex request_id_to_js_response_mutex_; std::unordered_map request_id_to_js_response_status_; - std::condition_variable cv_; + /* + * We keep track of the number of threads waiting for JS responses + * because Unregister() MUSTN'T be called when at least one thread + * is sleeping (calling it could result in a deadlock). + * + * This variable should only be accessed by a thread that currently + * locks request_id_to_js_response_mutex_. + */ + int threads_waiting_for_js_responses_; + std::condition_variable cv_; // used with request_id_to_js_response_mutex_ + std::thread::id main_thread_id_; + + static std::unique_lock LockRequestIdToJSResponseMutexIfFilterIsValid(CustomFilter*); + static std::mutex valid_filters_mutex_; + static std::unordered_set valid_filters_; }; } // namespace pipeline