Added preprocessor WITH_CLOUD for Subscribe device presence.
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
index 1d68a2f..76cce93 100644 (file)
@@ -169,6 +169,7 @@ namespace OC
             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
             // loop to ensure valid construction of all resources
+
             for(auto resource : container.Resources())
             {
                 std::thread exec(context->callback, resource);
@@ -324,6 +325,101 @@ namespace OC
         }
         return result;
     }
+#ifdef WITH_MQ
+    OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
+            OCClientResponse* clientResponse)
+    {
+        ClientCallbackContext::ListenContext* context =
+            static_cast<ClientCallbackContext::ListenContext*>(ctx);
+
+        if (!clientResponse)
+        {
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        if (clientResponse->result != OC_STACK_OK)
+        {
+            oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
+                    << clientResponse->result
+                    << std::flush;
+
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        auto clientWrapper = context->clientWrapper.lock();
+        if (!clientWrapper)
+        {
+            oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
+                    << std::flush;
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
+        try{
+            ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+                                        (OCRepPayload *) clientResponse->payload);
+
+            // loop to ensure valid construction of all resources
+            for (auto resource : container.Resources())
+            {
+                std::thread exec(context->callback, resource);
+                exec.detach();
+            }
+        }
+        catch (std::exception &e){
+            oclog() << "Exception in listCallback, ignoring response: "
+                    << e.what() << std::flush;
+        }
+
+
+        return OC_STACK_KEEP_TRANSACTION;
+    }
+
+    OCStackResult InProcClientWrapper::ListenForMQTopic(
+            const OCDevAddr& devAddr,
+            const std::string& resourceUri,
+            const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+            FindCallback& callback, QualityOfService QoS)
+    {
+        oclog() << "ListenForMQTopic()" << std::flush;
+
+        if (!callback)
+        {
+            return OC_STACK_INVALID_PARAM;
+        }
+
+        ClientCallbackContext::ListenContext* context =
+            new ClientCallbackContext::ListenContext(callback, shared_from_this());
+        OCCallbackData cbdata;
+        cbdata.context = static_cast<void*>(context),
+        cbdata.cb      = listenMQCallback;
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+
+        std::string uri = assembleSetResourceUri(resourceUri, queryParams);
+
+        OCStackResult result = OC_STACK_ERROR;
+        auto cLock = m_csdkLock.lock();
+        if (cLock)
+        {
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+            OCHeaderOption options[MAX_HEADER_OPTIONS];
+            result = OCDoResource(
+                                  nullptr, OC_REST_GET,
+                                  uri.c_str(),
+                                  &devAddr, nullptr,
+                                  CT_DEFAULT,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  assembleHeaderOptions(options, headerOptions),
+                                  headerOptions.size());
+        }
+        else
+        {
+            delete context;
+        }
+
+        return result;
+    }
+#endif
 
     OCStackApplicationResult listenDeviceCallback(void* ctx,
                                                   OCDoHandle /*handle*/,
@@ -415,6 +511,135 @@ namespace OC
         }
     }
 
+#ifdef WITH_MQ
+    OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+                    OCClientResponse* clientResponse)
+    {
+        ClientCallbackContext::CreateMQTopicContext* context =
+            static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
+        OCRepresentation rep;
+        HeaderOptions serverHeaderOptions;
+
+        if (!clientResponse)
+        {
+            return OC_STACK_DELETE_TRANSACTION;
+        }
+
+        std::string createdUri;
+        OCStackResult result = clientResponse->result;
+        if (OC_STACK_OK               == result ||
+            OC_STACK_RESOURCE_CREATED == result)
+        {
+            parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+            try
+            {
+                rep = parseGetSetCallback(clientResponse);
+            }
+            catch(OC::OCException& e)
+            {
+                result = e.code();
+            }
+
+            bool isLocationOption = false;
+            for (auto headerOption : serverHeaderOptions)
+            {
+                if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+                {
+                    createdUri += "/";
+                    createdUri += headerOption.getOptionData();
+                    if (!isLocationOption)
+                    {
+                        isLocationOption = true;
+                    }
+                }
+            }
+
+            if (!isLocationOption)
+            {
+                createdUri = clientResponse->resourceUri;
+            }
+        }
+
+        auto clientWrapper = context->clientWrapper.lock();
+
+        if (!clientWrapper)
+        {
+            oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+                    << std::flush;
+            return OC_STACK_DELETE_TRANSACTION;
+        }
+
+        try{
+            if (OC_STACK_OK               == result ||
+                OC_STACK_RESOURCE_CREATED == result)
+            {
+                ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+                                            createdUri);
+                for (auto resource : container.Resources())
+                {
+                    std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+                    exec.detach();
+                }
+            }
+            else
+            {
+                std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+                exec.detach();
+            }
+        }
+        catch (std::exception &e){
+            oclog() << "Exception in createMQTopicCallback, ignoring response: "
+                    << e.what() << std::flush;
+        }
+        return OC_STACK_DELETE_TRANSACTION;
+    }
+
+    OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+                const OCDevAddr& devAddr,
+                const std::string& uri,
+                const OCRepresentation& rep,
+                const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+                MQCreateTopicCallback& callback, QualityOfService QoS)
+    {
+        if (!callback)
+        {
+            return OC_STACK_INVALID_PARAM;
+        }
+        OCStackResult result;
+        ClientCallbackContext::CreateMQTopicContext* ctx =
+                new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+        OCCallbackData cbdata;
+        cbdata.context = static_cast<void*>(ctx),
+        cbdata.cb      = createMQTopicCallback;
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+
+        std::string url = assembleSetResourceUri(uri, queryParams);
+
+        auto cLock = m_csdkLock.lock();
+
+        if (cLock)
+        {
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+            OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+            result = OCDoResource(nullptr, OC_REST_PUT,
+                                  url.c_str(), &devAddr,
+                                  assembleSetResourcePayload(rep),
+                                  CT_DEFAULT,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  assembleHeaderOptions(options, headerOptions),
+                                  headerOptions.size());
+        }
+        else
+        {
+            delete ctx;
+            result = OC_STACK_ERROR;
+        }
+
+        return result;
+    }
+#endif
     OCStackApplicationResult getResourceCallback(void* ctx,
                                                  OCDoHandle /*handle*/,
         OCClientResponse* clientResponse)
@@ -559,6 +784,52 @@ namespace OC
         return ret;
     }
 
+    std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
+        const QueryParamsList& queryParams)
+    {
+        if (!uri.empty())
+        {
+            if (uri.back() == '/')
+            {
+                uri.resize(uri.size() - 1);
+            }
+        }
+
+        ostringstream paramsList;
+        if (queryParams.size() > 0)
+        {
+            paramsList << '?';
+        }
+
+        for (auto& param : queryParams)
+        {
+            for (auto& paramList : param.second)
+            {
+                paramsList << param.first << '=' << paramList;
+                if (paramList != param.second.back())
+                {
+                    paramsList << '&';
+                }
+            }
+            paramsList << ';';
+        }
+
+        std::string queryString = paramsList.str();
+
+        if (queryString.empty())
+        {
+            return uri;
+        }
+
+        if (queryString.back() == ';')
+        {
+            queryString.resize(queryString.size() - 1);
+        }
+
+        std::string ret = uri + queryString;
+        return ret;
+    }
+
     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
     {
         MessageContainer ocInfo;
@@ -576,6 +847,7 @@ namespace OC
         const std::string& uri,
         const OCRepresentation& rep,
         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+        OCConnectivityType connectivityType,
         PostCallback& callback, QualityOfService QoS)
     {
         if (!callback)
@@ -602,7 +874,7 @@ namespace OC
             result = OCDoResource(nullptr, OC_REST_POST,
                                   url.c_str(), &devAddr,
                                   assembleSetResourcePayload(rep),
-                                  CT_DEFAULT,
+                                  connectivityType,
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata,
                                   assembleHeaderOptions(options, headerOptions),
@@ -925,6 +1197,52 @@ namespace OC
         return result;
     }
 
+#ifdef WITH_CLOUD
+    OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
+                                                               const std::string& host,
+                                                               const QueryParamsList& queryParams,
+                                                               OCConnectivityType connectivityType,
+                                                               ObserveCallback& callback)
+    {
+        if (!callback)
+        {
+            return OC_STACK_INVALID_PARAM;
+        }
+        OCStackResult result;
+
+        ClientCallbackContext::ObserveContext* ctx =
+            new ClientCallbackContext::ObserveContext(callback);
+        OCCallbackData cbdata;
+        cbdata.context = static_cast<void*>(ctx),
+        cbdata.cb      = observeResourceCallback;
+        cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
+
+        auto cLock = m_csdkLock.lock();
+
+        if (cLock)
+        {
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+
+            std::ostringstream os;
+            os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
+            std::string url = assembleSetResourceUri(os.str(), queryParams);
+
+            result = OCDoResource(handle, OC_REST_OBSERVE,
+                                  url.c_str(), nullptr,
+                                  nullptr, connectivityType,
+                                  OC_LOW_QOS, &cbdata,
+                                  nullptr, 0);
+        }
+        else
+        {
+            delete ctx;
+            result = OC_STACK_ERROR;
+        }
+
+        return result;
+    }
+#endif
+
     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
     {
         qos = m_cfg.QoS;