Added MessageQueue(CoAP over MQ) API about Cloud Client side.
authorjihwan.seo <jihwan.seo@samsung.com>
Mon, 27 Jun 2016 23:39:32 +0000 (08:39 +0900)
committerAshok Babu Channa <ashok.channa@samsung.com>
Fri, 22 Jul 2016 14:07:39 +0000 (14:07 +0000)
- all APIs related MQ is called from OCResource.
  (MQ Broker and Topic is handled as Resource)
- MQ publisher can use publishMQTopic API
- MQ subscriber can use subscribeMQTopic / unsubscribeMQTopic
                        / requestMQPublish API
- All MQ type can use discoveryMQTopics / createMQTopic API
- publish message is sent with POST base on OCF spec
- createTopic message is sent with PUT base on OCF spec
- discoveryMQTopics / createMQTopic API is implemented through new path
  of wrapper class. because their callbacks is different with others
- Local MQ(D2D) is not support. it means there is no local MQ broker for D2D scenario
- refer : https://wiki.iotivity.org/message_queue_mq_for_publish-subscribe_interactions

Change-Id: Ibc1556a389f408634832149f646cd65bf8eda154
Signed-off-by: jihwan.seo <jihwan.seo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/8975
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jaehong Jo <jaehong.jo@samsung.com>
Reviewed-by: Ashok Babu Channa <ashok.channa@samsung.com>
14 files changed:
resource/csdk/stack/include/internal/ocresourcehandler.h
resource/csdk/stack/include/ocstackconfig.h
resource/csdk/stack/include/octypes.h
resource/csdk/stack/src/ocresource.c
resource/csdk/stack/src/ocstack.c
resource/csdk/stack/test/stacktests.cpp
resource/include/IClientWrapper.h
resource/include/InProcClientWrapper.h
resource/include/OCApi.h
resource/include/OCResource.h
resource/include/OCSerialization.h
resource/include/OutOfProcClientWrapper.h
resource/src/InProcClientWrapper.cpp
resource/src/OCResource.cpp

index 8b77e34..ba25f5d 100644 (file)
@@ -72,8 +72,14 @@ typedef enum
     OC_PRESENCE,
     #endif
 
+#ifdef MQ_BROKER
+    /** "/oic/ps" .*/
+    OC_MQ_BROKER_URI,
+#endif
+
     /** Max items in the list */
     OC_MAX_VIRTUAL_RESOURCES    //<s Max items in the list
+
 } OCVirtualResources;
 
 /**
index f5ecebb..96afa7a 100644 (file)
@@ -63,7 +63,7 @@
  *  Maximum number of vendor specific header options an application can set or receive
  *  in PDU
  */
-#define MAX_HEADER_OPTIONS (2)
+#define MAX_HEADER_OPTIONS (100)
 
 /**
  *  Maximum Length of the vendor specific header option
index 365b75c..b132d6e 100644 (file)
@@ -68,6 +68,12 @@ extern "C" {
 /** Gateway URI.*/
 #define OC_RSRVD_GATEWAY_URI                  "/oic/gateway"
 #endif
+
+#ifdef WITH_MQ
+/** MQ Broker URI.*/
+#define OC_RSRVD_WELL_KNOWN_MQ_URI            "/.well-known/ocf/ps"
+#endif
+
 #ifdef WITH_PRESENCE
 
 /** Presence URI through which the OIC devices advertise their presence.*/
@@ -149,6 +155,14 @@ extern "C" {
 /** To represent resource type with RES.*/
 #define OC_RSRVD_RESOURCE_TYPE_RES    "oic.wk.res"
 
+#ifdef WITH_MQ
+/** To represent content type with MQ Broker.*/
+#define OC_RSRVD_RESOURCE_TYPE_MQ_BROKER     "ocf.wk.ps"
+
+/** To represent content type with MQ Topic.*/
+#define OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC      "ocf.wk.ps.topic"
+#endif
+
 /** To represent interface.*/
 #define OC_RSRVD_INTERFACE              "if"
 
@@ -718,16 +732,16 @@ typedef enum
     /** When this bit is set, the resource is allowed to be discovered only
      *  if discovery request contains an explicit querystring.
      *  Ex: GET /oic/res?rt=oic.sec.acl */
-    OC_EXPLICIT_DISCOVERABLE   = (1 << 5),
+    OC_EXPLICIT_DISCOVERABLE   = (1 << 5)
 
 #ifdef WITH_MQ
     /** When this bit is set, the resource is allowed to be published */
-    OC_MQ_PUBLISHER     = (1 << 6),
+    ,OC_MQ_PUBLISHER     = (1 << 6)
 #endif
 
 #ifdef MQ_BROKER
     /** When this bit is set, the resource is allowed to be notified as MQ broker.*/
-    OC_MQ_BROKER        = (1 << 7),
+    ,OC_MQ_BROKER        = (1 << 7)
 #endif
 } OCResourceProperty;
 
@@ -855,7 +869,14 @@ typedef enum
     OC_OBSERVE_DEREGISTER = 1,
 
     /** Others. */
-    OC_OBSERVE_NO_OPTION = 2
+    OC_OBSERVE_NO_OPTION = 2,
+
+//#ifdef WITH_MQ
+    OC_MQ_SUBSCRIBER = 3,
+
+    OC_MQ_UNSUBSCRIBER = 4,
+//#endif
+
 } OCObserveAction;
 
 
index 2295d71..44a2135 100755 (executable)
@@ -231,6 +231,13 @@ static OCVirtualResources GetTypeOfVirtualURI(const char *uriInRequest)
         return OC_PRESENCE;
     }
 #endif //WITH_PRESENCE
+
+#ifdef MQ_BROKER
+    else if (0 == strcmp(uriInRequest, OC_RSRVD_WELL_KNOWN_MQ_URI))
+    {
+        return OC_MQ_BROKER_URI;
+    }
+#endif //MQ_BROKER
     return OC_UNKNOWN_URI;
 }
 
@@ -721,7 +728,11 @@ static OCStackResult HandleVirtualResource (OCServerRequest *request, OCResource
     OCVirtualResources virtualUriInRequest = GetTypeOfVirtualURI (request->resourceUrl);
 
     // Step 1: Generate the response to discovery request
-    if (virtualUriInRequest == OC_WELL_KNOWN_URI)
+    if (virtualUriInRequest == OC_WELL_KNOWN_URI
+#ifdef MQ_BROKER
+            || virtualUriInRequest == OC_MQ_BROKER_URI
+#endif
+            )
     {
         if (request->method == OC_REST_PUT || request->method == OC_REST_POST || request->method == OC_REST_DELETE)
         {
@@ -759,10 +770,19 @@ static OCStackResult HandleVirtualResource (OCServerRequest *request, OCResource
 
                 if (!resourceTypeQuery && interfaceQuery && (0 == strcmp(interfaceQuery, OC_RSRVD_INTERFACE_LL)))
                 {
+                    OCResourceProperty prop = OC_DISCOVERABLE;
+#ifdef MQ_BROKER
+                    if (OC_MQ_BROKER_URI == virtualUriInRequest)
+                    {
+                        prop = OC_MQ_BROKER;
+                    }
+#endif
+
                     for (; resource && discoveryResult == OC_STACK_OK; resource = resource->next)
                     {
                         bool result = false;
-                        if (resource->resourceProperties & OC_DISCOVERABLE)
+
+                        if (resource->resourceProperties & prop)
                         {
                             result = true;
                         }
index 40bfa0b..5cc4c63 100644 (file)
@@ -119,6 +119,10 @@ OCResource *headResource = NULL;
 static OCResource *tailResource = NULL;
 static OCResourceHandle platformResource = {0};
 static OCResourceHandle deviceResource = {0};
+#ifdef MQ_BROKER
+static OCResourceHandle brokerResource = {0};
+#endif
+
 #ifdef WITH_PRESENCE
 static OCPresenceState presenceState = OC_PRESENCE_UNINITIALIZED;
 static PresenceResource presenceResource = {0};
@@ -635,7 +639,6 @@ static OCStackResult CAResultToOCStackResult(CAResult_t caResult)
 OCStackResult CAResponseToOCStackResult(CAResponseResult_t caCode)
 {
     OCStackResult ret = OC_STACK_ERROR;
-
     switch(caCode)
     {
         case CA_CREATED:
@@ -1284,6 +1287,12 @@ void OCHandleResponse(const CAEndpoint_t* endPoint, const CAResponseInfo_t* resp
                     {
                         type = PAYLOAD_TYPE_DISCOVERY;
                     }
+#ifdef WITH_MQ
+                    else if (strcmp(cbNode->requestUri, OC_RSRVD_WELL_KNOWN_MQ_URI) == 0)
+                    {
+                        type = PAYLOAD_TYPE_DISCOVERY;
+                    }
+#endif
                     else if (strcmp(cbNode->requestUri, OC_RSRVD_DEVICE_URI) == 0)
                     {
                         type = PAYLOAD_TYPE_DEVICE;
@@ -1380,7 +1389,6 @@ void OCHandleResponse(const CAEndpoint_t* endPoint, const CAResponseInfo_t* resp
                             (observationOption << 8) | optionData[i];
                     }
                     response.sequenceNumber = observationOption;
-
                     response.numRcvdVendorSpecificHeaderOptions = responseInfo->info.numOptions - 1;
                     start = 1;
                 }
@@ -3222,7 +3230,7 @@ OCStackResult OCCreateResource(OCResourceHandle *handle,
         return OC_STACK_INVALID_PARAM;
     }
 
-    if(!resourceInterfaceName || strlen(resourceInterfaceName) == 0)
+    if (!resourceInterfaceName || strlen(resourceInterfaceName) == 0)
     {
         resourceInterfaceName = OC_RSRVD_INTERFACE_DEFAULT;
     }
@@ -4245,6 +4253,9 @@ void deleteAllResources()
     }
     memset(&platformResource, 0, sizeof(platformResource));
     memset(&deviceResource, 0, sizeof(deviceResource));
+#ifdef MQ_BROKER
+    memset(&brokerResource, 0, sizeof(brokerResource));
+#endif
 
     SRMDeInitSecureResources();
 
index 7afedb5..9bafe8e 100644 (file)
@@ -464,7 +464,7 @@ TEST(StackResource, CreateResourceBadParams)
                                             "/a/led",
                                             0,
                                             NULL,
-                                            128));// invalid bitmask for OCResourceProperty
+                                            255));// invalid bitmask for OCResourceProperty
 
     EXPECT_EQ(OC_STACK_OK, OCStop());
 }
index 2bcc0f6..43e52c1 100644 (file)
@@ -126,6 +126,20 @@ namespace OC
                                               const OCPrm_t& pmSel, const std::string& pinNumber,
                                               DirectPairingCallback& resultCallback) = 0;
 
+#ifdef WITH_MQ
+        virtual OCStackResult ListenForMQTopic(
+            const OCDevAddr& devAddr,
+            const std::string& resourceUri,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            FindCallback& callback, QualityOfService QoS) = 0;
+
+        virtual OCStackResult PutMQTopicRepresentation(
+            const OCDevAddr& devAddr,
+            const std::string& uri,
+            const OCRepresentation& rep,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            MQCreateTopicCallback& callback, QualityOfService QoS) = 0;
+#endif
         virtual ~IClientWrapper(){}
     };
 }
index 15eac59..e97c990 100644 (file)
@@ -99,6 +99,16 @@ namespace OC
             DirectPairingContext(DirectPairingCallback cb) : callback(cb){}
 
         };
+
+#ifdef WITH_MQ
+        struct CreateMQTopicContext
+        {
+            MQCreateTopicCallback callback;
+            std::weak_ptr<IClientWrapper> clientWrapper;
+            CreateMQTopicContext(MQCreateTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
+                : callback(cb), clientWrapper(cw){}
+        };
+#endif
     }
 
     class InProcClientWrapper : public IClientWrapper
@@ -185,6 +195,21 @@ namespace OC
         virtual OCStackResult DoDirectPairing(std::shared_ptr<OCDirectPairing> peer, const OCPrm_t& pmSel,
                 const std::string& pinNumber, DirectPairingCallback& resultCallback);
 
+#ifdef WITH_MQ
+        virtual OCStackResult ListenForMQTopic(
+            const OCDevAddr& devAddr,
+            const std::string& resourceUri,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            FindCallback& callback, QualityOfService QoS);
+
+        virtual OCStackResult PutMQTopicRepresentation(
+            const OCDevAddr& devAddr,
+            const std::string& uri,
+            const OCRepresentation& rep,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            MQCreateTopicCallback& callback, QualityOfService QoS);
+#endif
+
     private:
         void listeningFunc();
         std::string assembleSetResourceUri(std::string uri, const QueryParamsMap& queryParams);
index 75abb8b..1188db1 100644 (file)
@@ -294,6 +294,10 @@ namespace OC
 
     typedef std::function<void(const PairedDevices&)> GetDirectPairedCallback;
 
+    typedef std::function<void(const HeaderOptions&,
+                               const OCRepresentation&, const int,
+                               std::shared_ptr<OCResource>)> MQCreateTopicCallback;
+
 } // namespace OC
 
 #endif
index 536c50a..1e413a6 100644 (file)
@@ -543,6 +543,92 @@ namespace OC
         */
         std::string sid() const;
 
+#ifdef WITH_MQ
+        /**
+        * Function to discovery Topics from MQ Broker.
+        *
+        * @param queryParametersMap map which can have the query parameter name and value
+        * @param attributeHandler handles callback
+
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult discoveryMQTopics(const QueryParamsMap& queryParametersMap,
+                                        FindCallback attributeHandler);
+        /**
+        * Function to create Topic into MQ Broker.
+        * SubTopic is also created through this method.
+        *
+        * @param rep representation of the topic
+        * @param topicUri new uri of the topic which want to create
+        * @param queryParametersMap map which can have the query parameter name and value
+        * @param attributeHandler handles callback
+
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult createMQTopic(const OCRepresentation& rep,
+                                    const std::string& topicUri,
+                                    const QueryParamsMap& queryParametersMap,
+                                    MQCreateTopicCallback attributeHandler);
+#endif
+#ifdef MQ_SUBSCRIBER
+        /**
+        * Function to subscribe Topic to MQ Broker.
+        *
+        * @param observeType allows the client to specify how it wants to observe.
+        * @param queryParametersMap map which can have the query parameter name and value
+        * @param observeHandler handles callback
+
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult subscribeMQTopic(ObserveType observeType,
+                                       const QueryParamsMap& queryParametersMap,
+                                       ObserveCallback observeHandler);
+
+        /**
+        * Function to unsubscribe Topic to MQ Broker.
+        *
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult unsubscribeMQTopic();
+
+        /**
+        * Function to request publish to MQ publisher.
+        * Publisher can confirm the request message as key:"req_pub" and value:"true".
+        *
+        * @param queryParametersMap map which can have the query parameter name and value
+        * @param attributeHandler handles callback
+
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult requestMQPublish(const QueryParamsMap& queryParametersMap,
+                                       PostCallback attributeHandler);
+#endif
+#ifdef MQ_PUBLISHER
+        /**
+        * Function to publish Topic information into MQ Broker.
+        *
+        * @param rep representation of the topic
+        * @param queryParametersMap map which can have the query parameter name and value
+        * @param attributeHandler handles callback
+
+        * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
+        * @note OCStackResult is defined in ocstack.h.
+        *
+        */
+        OCStackResult publishMQTopic(const OCRepresentation& rep,
+                                     const QueryParamsMap& queryParametersMap,
+                                     PostCallback attributeHandler);
+#endif
         // overloaded operators allow for putting into a 'set'
         // the uniqueidentifier allows for putting into a hash
         bool operator==(const OCResource &other) const;
index 24cc059..693f546 100644 (file)
@@ -57,8 +57,9 @@ namespace OC
 
                         if (res->port != 0)
                         {
-                             m_devAddr.port = res->port;
+                            m_devAddr.port = res->port;
                         }
+
                         if (payload->baseURI)
                         {
                             OCDevAddr rdPubAddr = m_devAddr;
@@ -107,6 +108,44 @@ namespace OC
                 }
             }
 
+#ifdef WITH_MQ
+            ListenOCContainer(std::weak_ptr<IClientWrapper> cw,
+                                OCDevAddr& devAddr, OCRepPayload* payload)
+                                : m_clientWrapper(cw), m_devAddr(devAddr)
+            {
+                if (payload)
+                {
+                    char**topicList = nullptr;
+                    size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
+                    OCRepPayloadGetStringArray(payload, "topiclist", &topicList, dimensions);
+
+                    for(size_t idx = 0; idx < dimensions[0]; idx++)
+                    {
+                        m_resources.push_back(std::shared_ptr<OC::OCResource>(
+                                new OC::OCResource(m_clientWrapper, m_devAddr,
+                                                   std::string(topicList[idx]),
+                                                   "",
+                                                   OC_OBSERVABLE,
+                                                   {OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC},
+                                                   {DEFAULT_INTERFACE})));
+                    }
+                }
+            }
+
+            ListenOCContainer(std::weak_ptr<IClientWrapper> cw,
+                              OCDevAddr& devAddr, const std::string& topicUri)
+                              : m_clientWrapper(cw), m_devAddr(devAddr)
+            {
+                    m_resources.push_back(std::shared_ptr<OC::OCResource>(
+                            new OC::OCResource(m_clientWrapper, m_devAddr,
+                                               topicUri,
+                                               "",
+                                               OC_OBSERVABLE,
+                                               {OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC},
+                                               {DEFAULT_INTERFACE})));
+            }
+#endif
+
             const std::vector<std::shared_ptr<OCResource>>& Resources() const
             {
                 return m_resources;
index ef79c5e..513156d 100644 (file)
@@ -144,6 +144,25 @@ namespace OC
                 const OCPrm_t& /*pmSel*/,
                 const std::string& /*pinNumber*/, DirectPairingCallback& /*resultCallback*/)
             {return OC_STACK_NOTIMPL;}
+
+#ifdef WITH_MQ
+        virtual OCStackResult ListenForMQTopic(const OCDevAddr& /*devAddr*/,
+                                               const std::string& /*resourceUri*/,
+                                               const QueryParamsMap& /*queryParams*/,
+                                               const HeaderOptions& /*headerOptions*/,
+                                               FindCallback& /*callback*/,
+                                               QualityOfService /*QoS*/)
+            {return OC_STACK_NOTIMPL;}
+
+        virtual OCStackResult PutMQTopicRepresentation(const OCDevAddr& /*devAddr*/,
+                                                       const std::string& /*uri*/,
+                                                       const OCRepresentation& /*rep*/,
+                                                       const QueryParamsMap& /*queryParams*/,
+                                                       const HeaderOptions& /*headerOptions*/,
+                                                       MQCreateTopicCallback& /*callback*/,
+                                                       QualityOfService /*QoS*/)
+            {return OC_STACK_NOTIMPL;}
+#endif
     };
 }
 
index 68e0330..8a892c6 100644 (file)
@@ -169,6 +169,7 @@ namespace OC
             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
             // loop to ensure valid construction of all resources
+
             for(auto resource : container.Resources())
             {
                 std::thread exec(context->callback, resource);
@@ -324,6 +325,101 @@ namespace OC
         }
         return result;
     }
+#ifdef WITH_MQ
+    OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
+            OCClientResponse* clientResponse)
+    {
+        ClientCallbackContext::ListenContext* context =
+            static_cast<ClientCallbackContext::ListenContext*>(ctx);
+
+        if (!clientResponse)
+        {
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        if (clientResponse->result != OC_STACK_OK)
+        {
+            oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
+                    << clientResponse->result
+                    << std::flush;
+
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        auto clientWrapper = context->clientWrapper.lock();
+        if (!clientWrapper)
+        {
+            oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
+                    << std::flush;
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        try{
+            ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+                                        (OCRepPayload *) clientResponse->payload);
+
+            // loop to ensure valid construction of all resources
+            for (auto resource : container.Resources())
+            {
+                std::thread exec(context->callback, resource);
+                exec.detach();
+            }
+        }
+        catch (std::exception &e){
+            oclog() << "Exception in listCallback, ignoring response: "
+                    << e.what() << std::flush;
+        }
+
+
+        return OC_STACK_KEEP_TRANSACTION;
+    }
+
+    OCStackResult InProcClientWrapper::ListenForMQTopic(
+            const OCDevAddr& devAddr,
+            const std::string& resourceUri,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            FindCallback& callback, QualityOfService QoS)
+    {
+        oclog() << "ListenForMQTopic()" << std::flush;
+
+        if (!callback)
+        {
+            return OC_STACK_INVALID_PARAM;
+        }
+
+        ClientCallbackContext::ListenContext* context =
+            new ClientCallbackContext::ListenContext(callback, shared_from_this());
+        OCCallbackData cbdata;
+        cbdata.context = static_cast<void*>(context),
+        cbdata.cb      = listenMQCallback;
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+
+        std::string uri = assembleSetResourceUri(resourceUri, queryParams);
+
+        OCStackResult result = OC_STACK_ERROR;
+        auto cLock = m_csdkLock.lock();
+        if (cLock)
+        {
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+            OCHeaderOption options[MAX_HEADER_OPTIONS];
+            result = OCDoResource(
+                                  nullptr, OC_REST_GET,
+                                  uri.c_str(),
+                                  &devAddr, nullptr,
+                                  CT_DEFAULT,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  assembleHeaderOptions(options, headerOptions),
+                                  headerOptions.size());
+        }
+        else
+        {
+            delete context;
+        }
+
+        return result;
+    }
+#endif
 
     OCStackApplicationResult listenDeviceCallback(void* ctx,
                                                   OCDoHandle /*handle*/,
@@ -415,6 +511,135 @@ namespace OC
         }
     }
 
+#ifdef WITH_MQ
+    OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+                    OCClientResponse* clientResponse)
+    {
+        ClientCallbackContext::CreateMQTopicContext* context =
+            static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
+        OCRepresentation rep;
+        HeaderOptions serverHeaderOptions;
+
+        if (!clientResponse)
+        {
+            return OC_STACK_DELETE_TRANSACTION;
+        }
+
+        std::string createdUri;
+        OCStackResult result = clientResponse->result;
+        if (OC_STACK_OK               == result ||
+            OC_STACK_RESOURCE_CREATED == result)
+        {
+            parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+            try
+            {
+                rep = parseGetSetCallback(clientResponse);
+            }
+            catch(OC::OCException& e)
+            {
+                result = e.code();
+            }
+
+            bool isLocationOption = false;
+            for (auto headerOption : serverHeaderOptions)
+            {
+                if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+                {
+                    createdUri += "/";
+                    createdUri += headerOption.getOptionData();
+                    if (!isLocationOption)
+                    {
+                        isLocationOption = true;
+                    }
+                }
+            }
+
+            if (!isLocationOption)
+            {
+                createdUri = clientResponse->resourceUri;
+            }
+        }
+
+        auto clientWrapper = context->clientWrapper.lock();
+
+        if (!clientWrapper)
+        {
+            oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+                    << std::flush;
+            return OC_STACK_DELETE_TRANSACTION;
+        }
+
+        try{
+            if (OC_STACK_OK               == result ||
+                OC_STACK_RESOURCE_CREATED == result)
+            {
+                ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+                                            createdUri);
+                for (auto resource : container.Resources())
+                {
+                    std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+                    exec.detach();
+                }
+            }
+            else
+            {
+                std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+                exec.detach();
+            }
+        }
+        catch (std::exception &e){
+            oclog() << "Exception in createMQTopicCallback, ignoring response: "
+                    << e.what() << std::flush;
+        }
+        return OC_STACK_DELETE_TRANSACTION;
+    }
+
+    OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+                const OCDevAddr& devAddr,
+                const std::string& uri,
+                const OCRepresentation& rep,
+                const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+                MQCreateTopicCallback& callback, QualityOfService QoS)
+    {
+        if (!callback)
+        {
+            return OC_STACK_INVALID_PARAM;
+        }
+        OCStackResult result;
+        ClientCallbackContext::CreateMQTopicContext* ctx =
+                new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+        OCCallbackData cbdata;
+        cbdata.context = static_cast<void*>(ctx),
+        cbdata.cb      = createMQTopicCallback;
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+
+        std::string url = assembleSetResourceUri(uri, queryParams);
+
+        auto cLock = m_csdkLock.lock();
+
+        if (cLock)
+        {
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+            OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+            result = OCDoResource(nullptr, OC_REST_PUT,
+                                  url.c_str(), &devAddr,
+                                  assembleSetResourcePayload(rep),
+                                  CT_DEFAULT,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  assembleHeaderOptions(options, headerOptions),
+                                  headerOptions.size());
+        }
+        else
+        {
+            delete ctx;
+            result = OC_STACK_ERROR;
+        }
+
+        return result;
+    }
+#endif
     OCStackApplicationResult getResourceCallback(void* ctx,
                                                  OCDoHandle /*handle*/,
         OCClientResponse* clientResponse)
index 7d680ca..2fef5c1 100644 (file)
@@ -580,6 +580,69 @@ std::string OCResource::sid() const
     return this->uniqueIdentifier().m_representation;
 }
 
+#ifdef WITH_MQ
+OCStackResult OCResource::discoveryMQTopics(const QueryParamsMap& queryParametersMap,
+                                            FindCallback attributeHandler)
+{
+    QualityOfService defaultQos = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+    return checked_guard(m_clientWrapper.lock(),
+                            &IClientWrapper::ListenForMQTopic,
+                            m_devAddr, m_uri,
+                            queryParametersMap, m_headerOptions,
+                            attributeHandler, defaultQos);
+}
+
+OCStackResult OCResource::createMQTopic(const OCRepresentation& rep,
+                                        const std::string& topicUri,
+                                        const QueryParamsMap& queryParametersMap,
+                                        MQCreateTopicCallback attributeHandler)
+{
+    QualityOfService defaultQos = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+    return checked_guard(m_clientWrapper.lock(), &IClientWrapper::PutMQTopicRepresentation,
+                         m_devAddr, topicUri, rep, queryParametersMap,
+                         m_headerOptions, attributeHandler, defaultQos);
+}
+#endif
+#ifdef MQ_SUBSCRIBER
+OCStackResult OCResource::subscribeMQTopic(ObserveType observeType,
+        const QueryParamsMap& queryParametersMap, ObserveCallback observeHandler)
+{
+    QualityOfService defaultQoS = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
+
+    return result_guard(observe(observeType, queryParametersMap, observeHandler, defaultQoS));
+}
+
+OCStackResult OCResource::unsubscribeMQTopic()
+{
+    QualityOfService defaultQoS = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
+    return result_guard(cancelObserve(defaultQoS));
+}
+
+OCStackResult OCResource::requestMQPublish(const QueryParamsMap& queryParametersMap,
+                                           PostCallback attributeHandler)
+{
+    OCRepresentation rep;
+    rep.setValue(std::string("req_pub"), std::string("true"));
+    QualityOfService defaultQos = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+    return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+}
+#endif
+#ifdef MQ_PUBLISHER
+OCStackResult OCResource::publishMQTopic(const OCRepresentation& rep,
+                                         const QueryParamsMap& queryParametersMap,
+                                         PostCallback attributeHandler)
+{
+    QualityOfService defaultQos = OC::QualityOfService::NaQos;
+    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+    return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+}
+#endif
+
 bool OCResource::operator==(const OCResource &other) const
 {
     return m_resourceId == other.m_resourceId;