From 58ffd85c940411edac266104298bce58ddc09bf6 Mon Sep 17 00:00:00 2001 From: "i.metelytsia" Date: Fri, 21 Jul 2017 11:32:21 +0300 Subject: [PATCH] API for discovery topics was added. This API was used for demo presentation because server does not see the newly created topics and can not connect to them. --- device_core/iotivity_lib/inc/mqhandler.h | 17 ++++++++ device_core/iotivity_lib/inc/resource_callbacks.h | 23 +++++++++++ device_core/iotivity_lib/src/mqhandler.cpp | 45 ++++++++++++++++++++++ .../iotivity_lib/src/resource_callbacks.cpp | 17 ++++++++ device_core/nmdaemon/policyhandlermq.cpp | 18 +++++++++ 5 files changed, 120 insertions(+) diff --git a/device_core/iotivity_lib/inc/mqhandler.h b/device_core/iotivity_lib/inc/mqhandler.h index 3c81b37..7b2e28e 100644 --- a/device_core/iotivity_lib/inc/mqhandler.h +++ b/device_core/iotivity_lib/inc/mqhandler.h @@ -45,6 +45,13 @@ public: void subscribe(const std::string& subTopic, OC::ObserveCallback subscribeCB); /** + * @brief subscribe to existing topic + * @param subTopic sub topic that shoud be after root topic (Ex. '/srv/policy') + * @param subscribeCB method to that should be called after publish + */ + void subscribeToExistingTopic(const std::string& subTopic, OC::ObserveCallback subscribeCB); + + /** * @brief unsubscribe method to unsubscribe from topic * @param subTopic sub topic that shoud be after root topic (Ex. '/srv/policy') */ @@ -61,6 +68,16 @@ public: * @return resource object that represent topic */ OC::OCResource::Ptr getTopic(const std::string& subTopic); + + /** + * @brief Discovery Topic from MQ broker + * Call this method to find topic + * + * @param subTopic sub topic that shoud be after root topic (Ex. '/srv/policy') + * @return resource object that represent topic + */ + OC::OCResource::Ptr discoveryTopic(const std::string& subTopic); + private: /** diff --git a/device_core/iotivity_lib/inc/resource_callbacks.h b/device_core/iotivity_lib/inc/resource_callbacks.h index 75d29e5..56cd383 100644 --- a/device_core/iotivity_lib/inc/resource_callbacks.h +++ b/device_core/iotivity_lib/inc/resource_callbacks.h @@ -210,6 +210,29 @@ struct MqCreateTopicCallback : public CallbackBase std::shared_ptr resource); }; +struct MqDiscoveryTopicCallback : public CallbackBase +{ + typedef std::shared_ptr Sptr; + std::string topic; + std::shared_ptr resource; + + /** + * @brief MqDiscoveryTopicCallback constructor + * @param top [in] mq topic to search for + */ + MqDiscoveryTopicCallback(const std::string top) + : CallbackBase(), topic(top) {} + + /** + * @brief call - callback routine, called from iotivity framework + * @param ctx [in] this struct self weak reference + * @param resource [in] resource founded by iotivity + */ + static void call(std::weak_ptr ctx, + const int errorCode, const std::string& topic, + std::shared_ptr resource); +}; + /** * @brief bind_callback - helper function to simplify callback binding * @param ptr [in] weak reference to callback struct diff --git a/device_core/iotivity_lib/src/mqhandler.cpp b/device_core/iotivity_lib/src/mqhandler.cpp index c845194..2483a80 100644 --- a/device_core/iotivity_lib/src/mqhandler.cpp +++ b/device_core/iotivity_lib/src/mqhandler.cpp @@ -89,6 +89,36 @@ OC::OCResource::Ptr MqHandler::getTopic(const std::string& subTopic) return nullptr; } +OC::OCResource::Ptr MqHandler::discoveryTopic(const std::string& subTopic) +{ + std::string topic = DEFAULT_MQ_BROKER_URI_ROOT + subTopic; + QueryParamsMap query; + MqDiscoveryTopicCallback::Sptr callback = std::make_shared(topic); + + guardErrorCode(brokerResource->discoveryMQTopics( + query, + bind_callback(callback, PH::_1, PH::_2, PH::_3), + QualityOfService::HighQos), "discoveryMQTopics()"); + + if (!callback->wait()) + { + LOG_E(TAG, "discoveryMQTopic() cb not called."); + return nullptr; + } + + switch (callback->errorCode) + { + // Topic was discovered + case OC_STACK_OK: + return callback->resource; + + default: + guardErrorCode((OCStackResult)callback->errorCode, "discoveryMQTopic() cb"); + } + LOG_E(TAG, "MqHandler::discoveryTopic() failed"); + return nullptr; +} + void MqHandler::publish(const std::string& subTopic, OCRepresentation rep) { OC::OCResource::Ptr topicResource = getTopic(subTopic); @@ -124,6 +154,21 @@ void MqHandler::subscribe(const std::string& subTopic, ObserveCallback subscribe subscribeCB, QualityOfService::HighQos), "subscribeMQTopic()"); } +void MqHandler::subscribeToExistingTopic(const std::string& subTopic, OC::ObserveCallback subscribeCB) +{ + OC::OCResource::Ptr topicResource = discoveryTopic(subTopic); + + if (!topicResource) + return; + { + std::unique_lock lock(handler_mutex); + topicCache[subTopic] = topicResource; + } + QueryParamsMap query; + guardErrorCode(topicResource->subscribeMQTopic(ObserveType::Observe, query, + subscribeCB, QualityOfService::HighQos), "subscribeMQTopic()"); +} + void MqHandler::unsubscribe(const std::string& subTopic) { std::unique_lock lock(handler_mutex); diff --git a/device_core/iotivity_lib/src/resource_callbacks.cpp b/device_core/iotivity_lib/src/resource_callbacks.cpp index 9466d5c..6cd5250 100644 --- a/device_core/iotivity_lib/src/resource_callbacks.cpp +++ b/device_core/iotivity_lib/src/resource_callbacks.cpp @@ -152,5 +152,22 @@ void MqCreateTopicCallback::call(std::weak_ptr ctx, } } +void MqDiscoveryTopicCallback::call(std::weak_ptr ctx, + const int errorCode, const std::string& topic, + std::shared_ptr resource) +{ + if (auto p = ctx.lock()) + { + std::unique_lock ilock(p->mtx); + + if(resource->uri() == p->topic) + { + p->errorCode = errorCode; + p->resource = resource; + p->signalize(); + } + } +} + } // namespace NetworkManager diff --git a/device_core/nmdaemon/policyhandlermq.cpp b/device_core/nmdaemon/policyhandlermq.cpp index 0b85e52..729f21c 100644 --- a/device_core/nmdaemon/policyhandlermq.cpp +++ b/device_core/nmdaemon/policyhandlermq.cpp @@ -31,3 +31,21 @@ bool PolicyHandlerMQ::init() LOG_D(TAG, "Suscribed to topic [%s]", topic.c_str()); return true; } + +/** + * This method was added for demo presentation only. + * For some reason the server does not see the newly created topics + * and can not connect to them. + * So for demo presentation an existing topic is used. + */ +//bool PolicyHandlerMQ::init() +//{ +// auto iotivity = NetworkManager::IoTivity::getInstance(); +// auto handler = iotivity->getMqHandler(); +// std::string parentTopic = "/2a12a5a1-dad1-438d-8141-dbe1629ea93e"; +// std::string topic = parentTopic + "/policy"; +// handler->subscribeToExistingTopic(topic, std::bind(&PolicyHandler::observeCallback, this, PH::_1, PH::_2, PH::_3, PH::_4)); + +// LOG_D(TAG, "Suscribed to topic [%s]", topic.c_str()); +// return true; +//} -- 2.7.4