* limitations under the License.
*/
#include <tizen.h>
+#include <chrono>
+#include <mutex>
+#include <thread>
#include "common/logger.h"
#include "common/picojson.h"
#include "ml_pipeline.h"
+#include "ml_pipeline_custom_filter.h"
#include "ml_pipeline_sink.h"
#include "ml_utils.h"
return;
}
- Dispose();
+ auto disposed_successfully = PlatformResult{ErrorCode::ABORT_ERR};
+
+ /*
+ * Dispose() may fail if at least one CustomFilter is processing
+ * data. We'll make 5 attempts to increase the chance of successful
+ * disposal.
+ * It should succeed for almost all cases, but in principle it may fail.
+ */
+ for (int i = 0; !disposed_successfully && i < 5; ++i) {
+ disposed_successfully = Dispose();
+ if (!disposed_successfully) {
+ LoggerD("Disposal attempt number %d failed", i);
+ using namespace std::chrono_literals;
+ std::this_thread::sleep_for(500ms);
+ }
+ }
+
+ if (disposed_successfully) {
+ LoggerD("Successfully disposed the pipeline.");
+ } else {
+ LoggerE("Could not dispose the pipeline.");
+ }
}
void Pipeline::PipelineStateChangeListener(ml_pipeline_state_e state, void* user_data) {
ScopeLogger("id_: [%d]", id_);
/*
+ * A deadlock (in nnstreamer implementation) happens in the following scenario:
+ * - CustomFilter::CustomFilterListener() is waiting for response from
+ * JS (stages 2-4 of data processing, as described in comments in ml_pipeline_api.js)
+ * - ml_pipeline_destroy() is then called from a different thread (main thread
+ * in JS apps).
+ *
+ * If not prevented here, it could occur when pipeline.dispose() would have
+ * been called from CustomFilter callback in JS.
+ *
+ * To avoid this, we check if any filter is processing data (there's no
+ * easy way to check only those filters, that are used in this pipeline) and
+ * stop them from entering data processing stage for a moment. Only then we let
+ * calling ml_pipeline_destroy().
+ *
+ * When filter_locks will be destroyed, CustomFilter callbacks will resume.
+ */
+ std::vector<std::unique_lock<std::mutex>> filter_locks;
+ auto can_continue_dispose = CustomFilter::TryLockAllFilters(&filter_locks);
+ if (!can_continue_dispose) {
+ return PlatformResult{ErrorCode::INVALID_STATE_ERR,
+ "Pipeline cannot be disposed when at least one custom filter is "
+ "currently processing data."};
+ }
+
+ /*
* TODO in future commits:
*
* Release all nodes belonging to this pipeline and
const int CustomFilter::kCustomFilterIgnoreData = 1;
const int CustomFilter::kCustomFilterSuccess = 0;
+std::mutex CustomFilter::valid_filters_mutex_ = {};
+std::unordered_set<CustomFilter*> CustomFilter::valid_filters_ = {};
+
PlatformResult CustomFilter::CreateAndRegisterCustomFilter(
const std::string& name, const std::string& listener_name, TensorsInfo* input_tensors_info_ptr,
TensorsInfo* output_tensors_info_ptr, common::Instance* instance_ptr,
instance_ptr_{instance_ptr},
tensors_info_manager_ptr_{tensors_info_manager_ptr},
tensors_data_manager_ptr_{tensors_data_manager_ptr},
+ threads_waiting_for_js_responses_{0},
main_thread_id_{main_thread_id} {
ScopeLogger(
"name_: [%s], listener_name_: [%s], input_tensors_info::id: [%d], "
"output_tensors_info::id: [%d]",
name_.c_str(), listener_name_.c_str(), input_tensors_info_ptr_->Id(),
output_tensors_info_ptr_->Id());
+
+ std::lock_guard<std::mutex> lock{CustomFilter::valid_filters_mutex_};
+ CustomFilter::valid_filters_.insert(this);
}
CustomFilter::~CustomFilter() {
return PlatformResult{};
}
+ std::lock_guard<std::mutex> valid_filters_lock{CustomFilter::valid_filters_mutex_};
+ /*
+ * We ensure, that no thread is currently waiting for
+ * response - if we'd try to unregister a thread that is currently
+ * sleeping, we could cause a deadlock.
+ */
+ auto request_id_to_js_response_lock =
+ LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+ if (!request_id_to_js_response_lock) {
+ return PlatformResult{
+ ErrorCode::INVALID_STATE_ERR,
+ "The custom filter is processing data now. Stop the pipeline to unregister the filter."};
+ }
+
auto ret = ml_pipeline_custom_easy_filter_unregister(custom_filter_);
if (ML_ERROR_NONE != ret) {
LoggerE("ml_pipeline_custom_easy_filter_unregister() failed: [%d] (%s)", ret,
}
LoggerD("ml_pipeline_custom_easy_filter_unregister() succeeded");
+ CustomFilter::valid_filters_.erase(this);
+
custom_filter_ = nullptr;
return PlatformResult{};
CustomFilter* custom_filter_ptr = static_cast<CustomFilter*>(user_data);
+ auto lock = CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(custom_filter_ptr);
+ if (!lock) {
+ LoggerD("This custom filter is invalid. Pipeline will stop.");
+ return kCustomFilterError;
+ }
+
picojson::value message{picojson::object{}};
int request_id = -1;
TensorsData *input_tensors_data_ptr = nullptr, *output_tensors_data_ptr = nullptr;
native_input_tensors_data_handle, native_output_tensors_data_handle, &message, &request_id,
&input_tensors_data_ptr, &output_tensors_data_ptr);
- std::unique_lock<std::mutex> lock{custom_filter_ptr->request_id_to_js_response_mutex_};
common::Instance::PostMessage(custom_filter_ptr->instance_ptr_, message);
if (!success) {
return kCustomFilterError;
}
+ custom_filter_ptr->threads_waiting_for_js_responses_++;
+
/*
* Stage 1. of data processing (see the comment above customFilterWrapper in
* ml_pipeline.js) ends here.
*/
auto js_response_status = custom_filter_ptr->request_id_to_js_response_status_[request_id];
custom_filter_ptr->request_id_to_js_response_status_.erase(request_id);
- lock.unlock();
+
+ custom_filter_ptr->threads_waiting_for_js_responses_--;
return js_response_status;
}
+std::unique_lock<std::mutex>
+CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses() {
+ ScopeLogger("name_: [%s], listener_name_: [%s]", name_.c_str(), listener_name_.c_str());
+
+ std::unique_lock<std::mutex> lock{request_id_to_js_response_mutex_};
+ LoggerD("threads_waiting_for_js_responses_: [%d], lock acquired: [%s]",
+ threads_waiting_for_js_responses_,
+ 0 == threads_waiting_for_js_responses_ ? "true" : "false");
+ if (0 == threads_waiting_for_js_responses_) {
+ return lock;
+ }
+
+ return {};
+}
+
+std::unique_lock<std::mutex> CustomFilter::LockRequestIdToJSResponseMutexIfFilterIsValid(
+ CustomFilter* custom_filter_ptr) {
+ ScopeLogger();
+
+ std::lock_guard<std::mutex> lock{CustomFilter::valid_filters_mutex_};
+ if (CustomFilter::valid_filters_.count(custom_filter_ptr)) {
+ LoggerD("Filter is valid");
+ return std::unique_lock<std::mutex>{custom_filter_ptr->request_id_to_js_response_mutex_};
+ }
+
+ LoggerD("Filter is invalid");
+ return {};
+}
+
+bool CustomFilter::TryLockAllFilters(
+ std::vector<std::unique_lock<std::mutex>>* output_filter_locks) {
+ ScopeLogger();
+
+ std::lock_guard<std::mutex> valid_filters_lock{CustomFilter::valid_filters_mutex_};
+
+ for (auto* filter : CustomFilter::valid_filters_) {
+ auto filter_lock = filter->LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+ if (filter_lock) {
+ LoggerD("[%s] CustomFilter locked", filter->name_.c_str());
+ output_filter_locks->push_back(std::move(filter_lock));
+ } else {
+ LoggerD("[%s] CustomFilter is waiting for JS responses and cannot be locked",
+ filter->name_.c_str());
+ output_filter_locks->clear();
+ return false;
+ }
+ }
+
+ return true;
+}
+
} // namespace pipeline
} // namespace ml
} // namespace extension
\ No newline at end of file
static const int kCustomFilterIgnoreData;
static const int kCustomFilterSuccess;
+ /*
+ * If any filter is currently processing data, this function will
+ * return "false" and an empty vector pointed by output_filter_locks.
+ * If no filter is currently processing data, this function will return
+ * "true" and will lock request_id_to_js_response_mutex_ of each
+ * valid custom filter to prevent from executing JS code from
+ * within JS custom filter callbacks.
+ */
+ static bool TryLockAllFilters(std::vector<std::unique_lock<std::mutex>>* output_filter_locks);
+
private:
CustomFilter(const std::string& name, const std::string& listener_name,
TensorsInfo* input_tensors_info, TensorsInfo* output_tensors_info,
int getRequestId();
+ /*
+ * If no CustomFilter thread is sleeping, waiting for responses from JS,
+ * this function will return locked request_id_to_js_response_mutex_,
+ * thus preventing CustomFilterListener called in newly spawned threads
+ * from incrementing threads_waiting_for_js_responses_.
+ */
+ std::unique_lock<std::mutex> LockRequestIdToJSResponseMutexIfFilterIsNotWaitingForJSResponses();
+
const std::string name_;
const std::string listener_name_;
TensorsInfo* input_tensors_info_ptr_;
std::mutex request_id_to_js_response_mutex_;
std::unordered_map<int, int> request_id_to_js_response_status_;
- std::condition_variable cv_;
+ /*
+ * We keep track of the number of threads waiting for JS responses
+ * because Unregister() MUSTN'T be called when at least one thread
+ * is sleeping (calling it could result in a deadlock).
+ *
+ * This variable should only be accessed by a thread that currently
+ * locks request_id_to_js_response_mutex_.
+ */
+ int threads_waiting_for_js_responses_;
+ std::condition_variable cv_; // used with request_id_to_js_response_mutex_
+
std::thread::id main_thread_id_;
+
+ static std::unique_lock<std::mutex> LockRequestIdToJSResponseMutexIfFilterIsValid(CustomFilter*);
+ static std::mutex valid_filters_mutex_;
+ static std::unordered_set<CustomFilter*> valid_filters_;
};
} // namespace pipeline