Updated Message Queue for C++ API
authorjihwan.seo <jihwan.seo@samsung.com>
Wed, 3 Aug 2016 11:01:33 +0000 (20:01 +0900)
committerAshok Babu Channa <ashok.channa@samsung.com>
Wed, 17 Aug 2016 10:44:24 +0000 (10:44 +0000)
- add quality of service param in Cloud MQ client.
- combine callbacks for some APIs like 'createMQTopic','discoveryTopics'
  because both APIs need same callback format which give OCResource Obj.

Change-Id: Ie6eb734e10387216f6b1b6d1e5c122668ddcee6b
Signed-off-by: jihwan.seo <jihwan.seo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/9991
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jaehong Jo <jaehong.jo@samsung.com>
Reviewed-by: Ashok Babu Channa <ashok.channa@samsung.com>
android/android_api/base/jni/JniOcResource.cpp
resource/include/IClientWrapper.h
resource/include/InProcClientWrapper.h
resource/include/OCApi.h
resource/include/OCResource.h
resource/include/OutOfProcClientWrapper.h
resource/src/InProcClientWrapper.cpp
resource/src/OCResource.cpp

index 60d3161dfb26d6669348b0c0874321583fcc39c1..a90446a17840612e0917fdd6ca881b79bef85b3f 100644 (file)
@@ -457,7 +457,7 @@ OCStackResult JniOcResource::discoveryMQTopics(JNIEnv* env,
         onTopicFoundListener->foundTopicCallback(eCode, uri, resource);
     };
 
-    return m_sharedResource->discoveryMQTopics(queryParametersMap, findCallback);
+    return m_sharedResource->discoveryMQTopics(queryParametersMap, findCallback, QoS);
 }
 
 OCStackResult JniOcResource::createMQTopic(JNIEnv* env,
@@ -474,7 +474,7 @@ OCStackResult JniOcResource::createMQTopic(JNIEnv* env,
 
     return m_sharedResource->createMQTopic(representation, targetUri,
                                            queryParametersMap,
-                                           createCallback);
+                                           createCallback, QoS);
 }
 #endif
 #ifdef MQ_SUBSCRIBER
@@ -490,12 +490,12 @@ OCStackResult JniOcResource::subscribeMQTopic(JNIEnv* env,
     };
 
     return m_sharedResource->subscribeMQTopic(ObserveType::Observe, queryParametersMap,
-                                              subscribeCallback);
+                                              subscribeCallback, QoS);
 }
 
 OCStackResult JniOcResource::unsubscribeMQTopic(QualityOfService QoS)
 {
-    return m_sharedResource->unsubscribeMQTopic();
+    return m_sharedResource->unsubscribeMQTopic(QoS);
 }
 
 OCStackResult JniOcResource::requestMQPublish(JNIEnv* env,
@@ -509,7 +509,7 @@ OCStackResult JniOcResource::requestMQPublish(JNIEnv* env,
         onPostListener->onPostCallback(opts, rep, eCode);
     };
 
-    return m_sharedResource->requestMQPublish(queryParametersMap, postCallback);
+    return m_sharedResource->requestMQPublish(queryParametersMap, postCallback, QoS);
 }
 #endif
 #ifdef MQ_PUBLISHER
@@ -525,7 +525,7 @@ OCStackResult JniOcResource::publishMQTopic(JNIEnv* env, const OCRepresentation
     };
 
     return m_sharedResource->publishMQTopic(representation, queryParametersMap,
-                                            postCallback);
+                                            postCallback, QoS);
 }
 #endif
 
index 41a3689b4db997042c15f654af752d16e6fcde32..37adf61a1fe8252dc98ebf0a6c5848224e78be1f 100644 (file)
@@ -135,14 +135,14 @@ namespace OC
             const OCDevAddr& devAddr,
             const std::string& resourceUri,
             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-            FindCallback& callback, QualityOfService QoS) = 0;
+            MQTopicCallback& callback, QualityOfService QoS) = 0;
 
         virtual OCStackResult PutMQTopicRepresentation(
             const OCDevAddr& devAddr,
             const std::string& uri,
             const OCRepresentation& rep,
             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-            MQCreateTopicCallback& callback, QualityOfService QoS) = 0;
+            MQTopicCallback& callback, QualityOfService QoS) = 0;
 #endif
         virtual ~IClientWrapper(){}
     };
index f046c97e4a563b1ba185976923fedef8cee215ed..9565d3b81491eae63f19b849b0796df2af4bba24 100644 (file)
@@ -101,11 +101,11 @@ namespace OC
         };
 
 #ifdef WITH_MQ
-        struct CreateMQTopicContext
+        struct MQTopicContext
         {
-            MQCreateTopicCallback callback;
+            MQTopicCallback callback;
             std::weak_ptr<IClientWrapper> clientWrapper;
-            CreateMQTopicContext(MQCreateTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
+            MQTopicContext(MQTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
                 : callback(cb), clientWrapper(cw){}
         };
 #endif
@@ -204,14 +204,14 @@ namespace OC
             const OCDevAddr& devAddr,
             const std::string& resourceUri,
             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-            FindCallback& callback, QualityOfService QoS);
+            MQTopicCallback& callback, QualityOfService QoS);
 
         virtual OCStackResult PutMQTopicRepresentation(
             const OCDevAddr& devAddr,
             const std::string& uri,
             const OCRepresentation& rep,
             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-            MQCreateTopicCallback& callback, QualityOfService QoS);
+            MQTopicCallback& callback, QualityOfService QoS);
 #endif
 
     private:
index 0af7727580e58ed8b0ef7d00a207d2652e633c76..f2b06b3499056a161fae554cad74e4ac84c6f5cc 100644 (file)
@@ -297,9 +297,8 @@ namespace OC
 
     typedef std::function<void(const PairedDevices&)> GetDirectPairedCallback;
 
-    typedef std::function<void(const HeaderOptions&,
-                               const OCRepresentation&, const int,
-                               std::shared_ptr<OCResource>)> MQCreateTopicCallback;
+    typedef std::function<void(const int, const std::string&,
+                               std::shared_ptr<OCResource>)> MQTopicCallback;
 #ifdef RD_CLIENT
     typedef std::function<void(const OCRepresentation&, const int)> PublishResourceCallback;
 
index 1e413a68d05ff478bb90662d55139f59b3e6e267..2167ba61b0dea11f81ef9759dbbc3b8dcbb33c4a 100644 (file)
@@ -549,13 +549,15 @@ namespace OC
         *
         * @param queryParametersMap map which can have the query parameter name and value
         * @param attributeHandler handles callback
-
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
         */
         OCStackResult discoveryMQTopics(const QueryParamsMap& queryParametersMap,
-                                        FindCallback attributeHandler);
+                                        MQTopicCallback attributeHandler,
+                                        QualityOfService qos);
         /**
         * Function to create Topic into MQ Broker.
         * SubTopic is also created through this method.
@@ -564,7 +566,8 @@ namespace OC
         * @param topicUri new uri of the topic which want to create
         * @param queryParametersMap map which can have the query parameter name and value
         * @param attributeHandler handles callback
-
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
@@ -572,7 +575,8 @@ namespace OC
         OCStackResult createMQTopic(const OCRepresentation& rep,
                                     const std::string& topicUri,
                                     const QueryParamsMap& queryParametersMap,
-                                    MQCreateTopicCallback attributeHandler);
+                                    MQTopicCallback attributeHandler,
+                                    QualityOfService qos);
 #endif
 #ifdef MQ_SUBSCRIBER
         /**
@@ -581,23 +585,27 @@ namespace OC
         * @param observeType allows the client to specify how it wants to observe.
         * @param queryParametersMap map which can have the query parameter name and value
         * @param observeHandler handles callback
-
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
         */
         OCStackResult subscribeMQTopic(ObserveType observeType,
                                        const QueryParamsMap& queryParametersMap,
-                                       ObserveCallback observeHandler);
+                                       ObserveCallback observeHandler,
+                                       QualityOfService qos);
 
         /**
         * Function to unsubscribe Topic to MQ Broker.
         *
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
         */
-        OCStackResult unsubscribeMQTopic();
+        OCStackResult unsubscribeMQTopic(QualityOfService qos);
 
         /**
         * Function to request publish to MQ publisher.
@@ -605,13 +613,15 @@ namespace OC
         *
         * @param queryParametersMap map which can have the query parameter name and value
         * @param attributeHandler handles callback
-
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
         */
         OCStackResult requestMQPublish(const QueryParamsMap& queryParametersMap,
-                                       PostCallback attributeHandler);
+                                       PostCallback attributeHandler,
+                                       QualityOfService qos);
 #endif
 #ifdef MQ_PUBLISHER
         /**
@@ -620,14 +630,16 @@ namespace OC
         * @param rep representation of the topic
         * @param queryParametersMap map which can have the query parameter name and value
         * @param attributeHandler handles callback
-
+        * @param qos the quality of communication
+        *
         * @return Returns  ::OC_STACK_OK on success, some other value upon failure.
         * @note OCStackResult is defined in ocstack.h.
         *
         */
         OCStackResult publishMQTopic(const OCRepresentation& rep,
                                      const QueryParamsMap& queryParametersMap,
-                                     PostCallback attributeHandler);
+                                     PostCallback attributeHandler,
+                                     QualityOfService qos);
 #endif
         // overloaded operators allow for putting into a 'set'
         // the uniqueidentifier allows for putting into a hash
index e349508f49c1c52cf1d68905290857f3fa9f08fb..8e7e13ced44f983cde330ad5e9fe97c37ee8cbff 100644 (file)
@@ -153,7 +153,7 @@ namespace OC
                                                const std::string& /*resourceUri*/,
                                                const QueryParamsMap& /*queryParams*/,
                                                const HeaderOptions& /*headerOptions*/,
-                                               FindCallback& /*callback*/,
+                                               MQTopicCallback& /*callback*/,
                                                QualityOfService /*QoS*/)
             {return OC_STACK_NOTIMPL;}
 
@@ -162,7 +162,7 @@ namespace OC
                                                        const OCRepresentation& /*rep*/,
                                                        const QueryParamsMap& /*queryParams*/,
                                                        const HeaderOptions& /*headerOptions*/,
-                                                       MQCreateTopicCallback& /*callback*/,
+                                                       MQTopicCallback& /*callback*/,
                                                        QualityOfService /*QoS*/)
             {return OC_STACK_NOTIMPL;}
 #endif
index 2f223d9ca57259afd70420369c97e3ebd586d46c..bb6d860323a63faa413957a3c7af03a15d5c9a1f 100644 (file)
@@ -327,14 +327,14 @@ namespace OC
     }
 #ifdef WITH_MQ
     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
-            OCClientResponse* clientResponse)
+                                              OCClientResponse* clientResponse)
     {
-        ClientCallbackContext::ListenContext* context =
-            static_cast<ClientCallbackContext::ListenContext*>(ctx);
+        ClientCallbackContext::MQTopicContext* context =
+            static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
 
-        if (!clientResponse)
+        if (!clientResponse || !context)
         {
-            return OC_STACK_KEEP_TRANSACTION;
+            return OC_STACK_DELETE_TRANSACTION;
         }
 
         if (clientResponse->result != OC_STACK_OK)
@@ -343,7 +343,11 @@ namespace OC
                     << clientResponse->result
                     << std::flush;
 
-            return OC_STACK_KEEP_TRANSACTION;
+            std::thread exec(context->callback, clientResponse->result,
+                             clientResponse->resourceUri, nullptr);
+            exec.detach();
+
+            return OC_STACK_DELETE_TRANSACTION;
         }
 
         auto clientWrapper = context->clientWrapper.lock();
@@ -351,7 +355,7 @@ namespace OC
         {
             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
                     << std::flush;
-            return OC_STACK_KEEP_TRANSACTION;
+            return OC_STACK_DELETE_TRANSACTION;
         }
 
         try{
@@ -361,7 +365,8 @@ namespace OC
             // loop to ensure valid construction of all resources
             for (auto resource : container.Resources())
             {
-                std::thread exec(context->callback, resource);
+                std::thread exec(context->callback, clientResponse->result,
+                                 clientResponse->resourceUri, resource);
                 exec.detach();
             }
         }
@@ -370,15 +375,15 @@ namespace OC
                     << e.what() << std::flush;
         }
 
-
-        return OC_STACK_KEEP_TRANSACTION;
+        return OC_STACK_DELETE_TRANSACTION;
     }
 
-    OCStackResult InProcClientWrapper::ListenForMQTopic(
-            const OCDevAddr& devAddr,
-            const std::string& resourceUri,
-            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-            FindCallback& callback, QualityOfService QoS)
+    OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
+                                                        const std::string& resourceUri,
+                                                        const QueryParamsMap& queryParams,
+                                                        const HeaderOptions& headerOptions,
+                                                        MQTopicCallback& callback,
+                                                        QualityOfService QoS)
     {
         oclog() << "ListenForMQTopic()" << std::flush;
 
@@ -387,12 +392,12 @@ namespace OC
             return OC_STACK_INVALID_PARAM;
         }
 
-        ClientCallbackContext::ListenContext* context =
-            new ClientCallbackContext::ListenContext(callback, shared_from_this());
+        ClientCallbackContext::MQTopicContext* context =
+            new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
         OCCallbackData cbdata;
         cbdata.context = static_cast<void*>(context),
         cbdata.cb      = listenMQCallback;
-        cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
 
         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
 
@@ -515,32 +520,23 @@ namespace OC
     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
                     OCClientResponse* clientResponse)
     {
-        ClientCallbackContext::CreateMQTopicContext* context =
-            static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
-        OCRepresentation rep;
+        ClientCallbackContext::MQTopicContext* context =
+            static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
         HeaderOptions serverHeaderOptions;
 
-        if (!clientResponse)
+        if (!clientResponse || !context)
         {
             return OC_STACK_DELETE_TRANSACTION;
         }
 
         std::string createdUri;
+        bool isLocationOption = false;
         OCStackResult result = clientResponse->result;
         if (OC_STACK_OK               == result ||
             OC_STACK_RESOURCE_CREATED == result)
         {
             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
-            try
-            {
-                rep = parseGetSetCallback(clientResponse);
-            }
-            catch(OC::OCException& e)
-            {
-                result = e.code();
-            }
 
-            bool isLocationOption = false;
             for (auto headerOption : serverHeaderOptions)
             {
                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
@@ -553,11 +549,11 @@ namespace OC
                     }
                 }
             }
+        }
 
-            if (!isLocationOption)
-            {
-                createdUri = clientResponse->resourceUri;
-            }
+        if (!isLocationOption)
+        {
+            createdUri = clientResponse->resourceUri;
         }
 
         auto clientWrapper = context->clientWrapper.lock();
@@ -577,13 +573,13 @@ namespace OC
                                             createdUri);
                 for (auto resource : container.Resources())
                 {
-                    std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+                    std::thread exec(context->callback, result, createdUri, resource);
                     exec.detach();
                 }
             }
             else
             {
-                std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+                std::thread exec(context->callback, result, createdUri, nullptr);
                 exec.detach();
             }
         }
@@ -599,19 +595,19 @@ namespace OC
                 const std::string& uri,
                 const OCRepresentation& rep,
                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
-                MQCreateTopicCallback& callback, QualityOfService QoS)
+                MQTopicCallback& callback, QualityOfService QoS)
     {
         if (!callback)
         {
             return OC_STACK_INVALID_PARAM;
         }
         OCStackResult result;
-        ClientCallbackContext::CreateMQTopicContext* ctx =
-                new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+        ClientCallbackContext::MQTopicContext* ctx =
+                new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
         OCCallbackData cbdata;
         cbdata.context = static_cast<void*>(ctx),
         cbdata.cb      = createMQTopicCallback;
-        cbdata.cd      = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
 
         std::string url = assembleSetResourceUri(uri, queryParams);
 
index 6338ee1901f039c5890825b28e596b798561c091..b83061f5227612a0dcbf91e880cfe281eb3a6587 100644 (file)
@@ -588,64 +588,57 @@ std::string OCResource::sid() const
 
 #ifdef WITH_MQ
 OCStackResult OCResource::discoveryMQTopics(const QueryParamsMap& queryParametersMap,
-                                            FindCallback attributeHandler)
+                                            MQTopicCallback attributeHandler,
+                                            QualityOfService qos)
 {
-    QualityOfService defaultQos = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
     return checked_guard(m_clientWrapper.lock(),
                             &IClientWrapper::ListenForMQTopic,
                             m_devAddr, m_uri,
                             queryParametersMap, m_headerOptions,
-                            attributeHandler, defaultQos);
+                            attributeHandler, qos);
 }
 
 OCStackResult OCResource::createMQTopic(const OCRepresentation& rep,
                                         const std::string& topicUri,
                                         const QueryParamsMap& queryParametersMap,
-                                        MQCreateTopicCallback attributeHandler)
+                                        MQTopicCallback attributeHandler,
+                                        QualityOfService qos)
 {
-    QualityOfService defaultQos = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
     return checked_guard(m_clientWrapper.lock(), &IClientWrapper::PutMQTopicRepresentation,
                          m_devAddr, topicUri, rep, queryParametersMap,
-                         m_headerOptions, attributeHandler, defaultQos);
+                         m_headerOptions, attributeHandler, qos);
 }
 #endif
 #ifdef MQ_SUBSCRIBER
 OCStackResult OCResource::subscribeMQTopic(ObserveType observeType,
-        const QueryParamsMap& queryParametersMap, ObserveCallback observeHandler)
+                                           const QueryParamsMap& queryParametersMap,
+                                           ObserveCallback observeHandler,
+                                           QualityOfService qos)
 {
-    QualityOfService defaultQoS = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
-
-    return result_guard(observe(observeType, queryParametersMap, observeHandler, defaultQoS));
+    return result_guard(observe(observeType, queryParametersMap, observeHandler, qos));
 }
 
-OCStackResult OCResource::unsubscribeMQTopic()
+OCStackResult OCResource::unsubscribeMQTopic(QualityOfService qos)
 {
-    QualityOfService defaultQoS = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
-    return result_guard(cancelObserve(defaultQoS));
+    return result_guard(cancelObserve(qos));
 }
 
 OCStackResult OCResource::requestMQPublish(const QueryParamsMap& queryParametersMap,
-                                           PostCallback attributeHandler)
+                                           PostCallback attributeHandler,
+                                           QualityOfService qos)
 {
     OCRepresentation rep;
     rep.setValue(std::string("req_pub"), std::string("true"));
-    QualityOfService defaultQos = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
-    return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+    return result_guard(post(rep, queryParametersMap, attributeHandler, qos));
 }
 #endif
 #ifdef MQ_PUBLISHER
 OCStackResult OCResource::publishMQTopic(const OCRepresentation& rep,
                                          const QueryParamsMap& queryParametersMap,
-                                         PostCallback attributeHandler)
+                                         PostCallback attributeHandler,
+                                         QualityOfService qos)
 {
-    QualityOfService defaultQos = OC::QualityOfService::NaQos;
-    checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
-    return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+    return result_guard(post(rep, queryParametersMap, attributeHandler, qos));
 }
 #endif