From bd5ca868301004bf920136d158e731c5f6d082ff Mon Sep 17 00:00:00 2001 From: Lomtev Dmytro Date: Wed, 31 May 2017 17:23:11 +0300 Subject: [PATCH] PolicyClient multithreading refatored. --- device_core/iotivity_lib/inc/policy_client.h | 7 +- device_core/iotivity_lib/src/policy_client.cpp | 205 ++++++++++++++++--------- 2 files changed, 137 insertions(+), 75 deletions(-) diff --git a/device_core/iotivity_lib/inc/policy_client.h b/device_core/iotivity_lib/inc/policy_client.h index 68fc91a..8e8dcf6 100644 --- a/device_core/iotivity_lib/inc/policy_client.h +++ b/device_core/iotivity_lib/inc/policy_client.h @@ -48,12 +48,7 @@ public: * @param policy [in] policy in JSON format */ void postPolicy(const std::string& did, const std::string& agent, const std::string& policy); -private: - /** - * @brief foundResourceCb callback processing resource found events - * @param found_resource [in] found resource - */ - void foundResourceCb(std::shared_ptr found_resource); + }; } // namespace NetworkManager diff --git a/device_core/iotivity_lib/src/policy_client.cpp b/device_core/iotivity_lib/src/policy_client.cpp index 05503a3..9792cab 100644 --- a/device_core/iotivity_lib/src/policy_client.cpp +++ b/device_core/iotivity_lib/src/policy_client.cpp @@ -14,12 +14,82 @@ namespace PH = std::placeholders; namespace NetworkManager { +struct FindPolicyCallback +{ + std::shared_ptr resource; + bool fired = false; + std::condition_variable signal; + std::mutex mtx; + + void signalize() + { + fired = true; + signal.notify_one(); + } + + static void call(std::weak_ptr ctx, std::shared_ptr found_resource) + { + if (auto p = ctx.lock()) + { + std::unique_lock lock(p->mtx); + + if(!p->resource && found_resource) + { + for(auto &resource_type : found_resource->getResourceTypes()) + { + if(resource_type == "core.policy") + { + if (found_resource->connectivityType() & OCConnectivityType::CT_ADAPTER_TCP) + { + p->resource = found_resource; + LOG_I(TAG, "Address of selected policy resource: %s", found_resource->host().c_str()); + p->signalize(); + } + } + } + } + } + } +}; + +struct PostPolicyCallback +{ + bool fired = false; + std::condition_variable signal; + int ecode = 0; + + void signalize() + { + fired = true; + signal.notify_one(); + } + + static void call(std::weak_ptr ctx, const HeaderOptions& hopts, const OCRepresentation& rep, const int eCode) + { + if (auto p = ctx.lock()) + { + LOG_I(TAG, "Post policy return code=%d", eCode); + p->ecode = eCode; + p->signalize(); + } + } +}; + PolicyClient::PolicyClient(const std::string& host) { std::string requestURI{OC_RSRVD_WELL_KNOWN_URI}; - std::unique_lock lock(report_mutex); - OCPlatform::findResource(host, requestURI, CT_DEFAULT, std::bind(&PolicyClient::foundResourceCb, this, PH::_1)); - signal.wait_for(lock, std::chrono::seconds(3)); + std::shared_ptr callback = std::make_shared(); + std::unique_lock lock(callback->mtx); + OCPlatform::findResource(host, requestURI, CT_DEFAULT, std::bind(&FindPolicyCallback::call, std::weak_ptr(callback), PH::_1)); + callback->signal.wait_for(lock, std::chrono::seconds(3)); + if (!callback->fired) + { + LOG_E(TAG, "PolicyClient found callback not called."); + } + else + { + resource = callback->resource; + } } PolicyClient::operator bool() const @@ -27,55 +97,78 @@ PolicyClient::operator bool() const return (bool)resource; } -std::string PolicyClient::getPolicy(const std::string& did, const std::string& agent) +struct GetPolicyCallback { - LOG_I(TAG, "PolicyClient::getPolicy(%s, %s)", did.c_str(), agent.c_str()); - std::unique_lock lock(report_mutex); - QueryParamsMap query{{"did", did}, {"agent", agent}}; - fired = false; - lastResult = OC_STACK_OK; + std::string policy; + bool fired = false; + std::condition_variable signal; + std::mutex mtx; + int ecode = OC_STACK_OK; - if(resource) + void signalize() + { + fired = true; + signal.notify_one(); + } + + static void call(std::weak_ptr ctx, const HeaderOptions& hopts, const OCRepresentation& rep, const int eCode) { - auto result = resource->get(query, [this](const HeaderOptions& hopts, const OCRepresentation& rep, const int eCode) + if (auto p = ctx.lock()) + { + std::unique_lock lock(p->mtx); + + try { - try + if(eCode == OC_STACK_OK) { - if(eCode == OC_STACK_OK) - { - rep.getValue("policy", policy); - } - else - { - LOG_E(TAG, "Error (PolicyClient::getPolicy): code=%d", eCode); - lastResult = static_cast(eCode); - } + rep.getValue("policy", p->policy); } - catch(std::exception& e) + else { - LOG_E(TAG, "Exception (PolicyClient::getPolicy): %s", e.what()); + LOG_E(TAG, "Error (PolicyClient::getPolicy): code=%d", eCode); + p->ecode = eCode; } + } + catch(std::exception& e) + { + LOG_E(TAG, "Exception (PolicyClient::getPolicy): %s", e.what()); + } - fired = true; - signal.notify_one(); - }); + p->signalize(); + } + } +}; + +std::string PolicyClient::getPolicy(const std::string& did, const std::string& agent) +{ + LOG_I(TAG, "PolicyClient::getPolicy(%s, %s)", did.c_str(), agent.c_str()); + + if(resource) + { + QueryParamsMap query{{"did", did}, {"agent", agent}}; + std::shared_ptr callback = std::make_shared(); + std::unique_lock lock(callback->mtx); + + auto result = resource->get(query, std::bind(&GetPolicyCallback::call, std::weak_ptr(callback), PH::_1, PH::_2, PH::_3)); if (OC_STACK_OK != result) { throw IoTInternalError("PolicyClient::getPolicy error", result); } - signal.wait_for(lock, std::chrono::seconds(3), [this] { return fired; }); + callback->signal.wait_for(lock, std::chrono::seconds(3), [&callback] { return callback->fired; }); - if (!fired) + if (!callback->fired) { throw IoTInternalError("PolicyClient::getPolicy callback not called", EC_UNAUTHORIZED); } - if (lastResult !=OC_STACK_OK) + if (callback->ecode !=OC_STACK_OK) { - throw IoTInternalError("PolicyClient::getPolicy error", lastResult); + throw IoTInternalError("PolicyClient::getPolicy callback error", callback->ecode); } + + policy = callback->policy; } return policy; @@ -94,56 +187,30 @@ void PolicyClient::postPolicy(const std::string& did, const std::string& agent, QueryParamsMap query{{"did", did}, {"agent", agent}}; - std::unique_lock lock(report_mutex); - fired = false; - auto result = resource->post("core.policy", DEFAULT_INTERFACE, repr, query, - [this] (const HeaderOptions & /*ho*/, const OCRepresentation &rep, const int eCode) - { - LOG_I(TAG, "Post policy return code=%d", eCode); - fired = true; - signal.notify_one(); - } - ); + std::shared_ptr callback = std::make_shared(); + std::mutex mtx; + std::unique_lock lock(mtx); + + auto result = resource->post("core.policy", + DEFAULT_INTERFACE, + repr, + query, + std::bind(&PostPolicyCallback::call, std::weak_ptr(callback), PH::_1, PH::_2, PH::_3)); if (OC_STACK_OK != result) { throw IoTInternalError("PolicyClient::postPolicy error", result); } - signal.wait_for(lock, std::chrono::seconds(1), [this] { return fired; }); + callback->signal.wait_for(lock, std::chrono::seconds(3), [&callback] { return callback->fired; }); - if (!fired) + if (!callback->fired) { throw IoTInternalError("PolicyClient::postPolicy callback not called", EC_UNAUTHORIZED); } -} - -void PolicyClient::foundResourceCb(std::shared_ptr found_resource) -{ - try - { - std::unique_lock lock(report_mutex); - - if(!resource && found_resource) - { - for(auto &resource_type : found_resource->getResourceTypes()) - { - if(resource_type == "core.policy") - { - if (found_resource->connectivityType() & OCConnectivityType::CT_ADAPTER_TCP) - { - resource = found_resource; - LOG_I(TAG, "Address of selected policy resource: %s", found_resource->host().c_str()); - signal.notify_one(); - } - } - } - } - - } - catch(std::exception& e) + if (OC_STACK_OK != callback->ecode && OC_STACK_RESOURCE_CHANGED != callback->ecode) { - LOG_E(TAG, "Exception in foundResource: %s", e.what()); + throw IoTInternalError("PolicyClient::postPolicy callback error", callback->ecode); } } -- 2.7.4