Implement ServerID parsing and sample app
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
index a56833a..9fd3153 100644 (file)
 //
 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
 
-#include <new>
-
 #include "InProcClientWrapper.h"
 #include "ocstack.h"
 
 #include "OCPlatform.h"
 #include "OCResource.h"
-
+#include <OCSerialization.h>
 using namespace std;
 
 namespace OC
 {
-    InProcClientWrapper::InProcClientWrapper(OC::OCPlatform_impl& owner,
+    InProcClientWrapper::InProcClientWrapper(
         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
-            : IClientWrapper(owner),
-              m_threadRun(false), m_csdkLock(csdkLock),
-              m_owner(owner),
+            : m_threadRun(false), m_csdkLock(csdkLock),
               m_cfg { cfg }
     {
         // if the config type is server, we ought to never get called.  If the config type
         // is both, we count on the server to run the thread and do the initialize
+
         if(m_cfg.mode == ModeType::Client)
         {
             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
@@ -61,7 +58,12 @@ namespace OC
             m_listeningThread.join();
         }
 
-        OCStop();
+        // only stop if we are the ones who actually called 'init'.  We are counting
+        // on the server to do the stop.
+        if(m_cfg.mode == ModeType::Client)
+        {
+            OCStop();
+        }
     }
 
     void InProcClientWrapper::listeningFunc()
@@ -90,68 +92,38 @@ namespace OC
         }
     }
 
-
-
-    std::string convertOCAddrToString(OCDevAddr& addr)
+    OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
     {
-        // TODO: we currently assume this is a IPV4 address, need to figure out the actual value
-
-        uint8_t a, b, c, d;
-        uint16_t port;
-
-        if(OCDevAddrToIPv4Addr(&addr, &a, &b, &c, &d) ==0 && OCDevAddrToPort(&addr, &port)==0)
-        {
-            ostringstream os;
-            os << "coap://"<<(int)a<<'.'<<(int)b<<'.'<<(int)c<<'.'<<(int)d<<':'<<(int)port;
-            return os.str();
-        }
-        else
+        if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
         {
-            return OC::Error::INVALID_IP;
+            throw OCException(OC::Exception::STR_NULL_RESPONSE, OC_STACK_ERROR);
         }
-    }
-
-    struct ListenContext
-    {
-        FindCallback              callback;
-        IClientWrapper::Ptr       clientWrapper;
-    };
 
+        MessageContainer oc;
+        oc.setJSONRepresentation(clientResponse->resJSONPayload);
 
-    std::shared_ptr<OCResource> InProcClientWrapper::parseOCResource(
-        IClientWrapper::Ptr clientWrapper, const std::string& host,
-        const boost::property_tree::ptree resourceNode)
-    {
-        std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
-        bool obs = resourceNode.get<int>(OC::Key::OBSERVABLEKEY,0) == 1;
-        std::vector<std::string> rTs;
-        std::vector<std::string> ifaces;
-
-        boost::property_tree::ptree properties =
-            resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
-
-        boost::property_tree::ptree rT =
-            properties.get_child(OC::Key::RESOURCETYPESKEY, boost::property_tree::ptree());
-        for(auto itr : rT)
+        std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
+        if(it == oc.representations().end())
         {
-            rTs.push_back(itr.second.data());
+            throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_ERROR);
         }
 
-        boost::property_tree::ptree iF =
-            properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
-        for(auto itr : iF)
-        {
-            ifaces.push_back(itr.second.data());
-        }
+        // first one is considered the root, everything else is considered a child of this one.
+        OCRepresentation root = *it;
+        ++it;
+
+        std::for_each(it, oc.representations().end(),
+                [&root](const OCRepresentation& repItr)
+                {root.addChild(repItr);});
+        return root;
 
-        return std::shared_ptr<OCResource>(
-            new OCResource(clientWrapper, host, uri, obs, rTs, ifaces));
     }
 
     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
         OCClientResponse* clientResponse)
     {
-        ListenContext* context = static_cast<ListenContext*>(ctx);
+        ClientCallbackContext::ListenContext* context =
+            static_cast<ClientCallbackContext::ListenContext*>(ctx);
 
         if(clientResponse->result != OC_STACK_OK)
         {
@@ -162,205 +134,157 @@ namespace OC
             return OC_STACK_KEEP_TRANSACTION;
         }
 
+        auto clientWrapper = context->clientWrapper.lock();
+
+        if(!clientWrapper)
+        {
+            oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
+                    << std::flush;
+            return OC_STACK_KEEP_TRANSACTION;
+        }
+
         std::stringstream requestStream;
         requestStream << clientResponse->resJSONPayload;
 
-        boost::property_tree::ptree root;
-
         try
         {
-                boost::property_tree::read_json(requestStream, root);
-        }
-        catch(boost::property_tree::json_parser::json_parser_error &e)
-        {
-                oclog() << "listenCallback(): read_json() failed: " << e.what()
-                        << std::flush;
-
-                return OC_STACK_KEEP_TRANSACTION;
-        }
+            ListenOCContainer container(clientWrapper, *clientResponse->addr,
+                    requestStream);
 
-        boost::property_tree::ptree payload =
-                root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
+            // loop to ensure valid construction of all resources
+            for(auto resource : container.Resources())
+            {
+                std::thread exec(context->callback, resource);
+                exec.detach();
+            }
 
-        for(auto payloadItr : payload)
+        }
+        catch(const std::exception& e)
         {
-                try
-                {
-                    std::string host = convertOCAddrToString(*clientResponse->addr);
-                    std::shared_ptr<OCResource> resource =
-                        context->clientWrapper->parseOCResource(context->clientWrapper, host,
-                        payloadItr.second);
-
-                    // Note: the call to detach allows the underlying thread to continue until
-                    // completion and allows us to destroy the exec object. This is apparently NOT
-                    // a memory leak, as the thread will apparently take care of itself.
-                    // Additionally, the only parameter here is a shared ptr, so OCResource will be
-                    // disposed of properly upon completion of the callback handler.
-                    std::thread exec(context->callback,resource);
-                    exec.detach();
-                }
-                catch(ResourceInitException& e)
-                {
-                    oclog() << "listenCallback(): failed to create resource: " << e.what()
-                            << std::flush;
-                }
+            oclog() << "listenCallback failed to parse a malformed message: "
+                    << e.what()
+                    << std::endl
+                    << clientResponse->resJSONPayload
+                    << std::endl
+                    << clientResponse->result
+                    << std::flush;
+            return OC_STACK_KEEP_TRANSACTION;
         }
 
         return OC_STACK_KEEP_TRANSACTION;
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
+        const std::string& resourceType, uint8_t connectivityType,
+        FindCallback& callback, QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
+#endif
     {
         OCStackResult result;
 
         OCCallbackData cbdata = {0};
 
-        ListenContext* context = new ListenContext();
+        ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
         context->callback = callback;
         context->clientWrapper = shared_from_this();
 
         cbdata.context =  static_cast<void*>(context);
         cbdata.cb = listenCallback;
-        cbdata.cd = [](void* c){delete static_cast<ListenContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
 
         auto cLock = m_csdkLock.lock();
         if(cLock)
         {
             std::lock_guard<std::recursive_mutex> lock(*cLock);
             OCDoHandle handle;
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_GET,
+                                  resourceType.c_str(),
+                                  nullptr, nullptr, connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  NULL, 0);
+#else
             result = OCDoResource(&handle, OC_REST_GET,
                                   resourceType.c_str(),
                                   nullptr, nullptr,
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata,
                                   NULL, 0);
+#endif
         }
         else
         {
+            delete context;
             result = OC_STACK_ERROR;
         }
         return result;
     }
 
-    struct GetContext
+    OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
+            OCClientResponse* clientResponse)
     {
-        GetCallback callback;
-    };
+        ClientCallbackContext::DeviceListenContext* context =
+            static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
 
-    struct SetContext
-    {
-        PutCallback callback;
-    };
+        OCRepresentation rep = parseGetSetCallback(clientResponse);
+        std::thread exec(context->callback, rep);
+        exec.detach();
 
+        return OC_STACK_KEEP_TRANSACTION;
+    }
 
-    OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
+        const std::string& deviceURI, uint8_t connectivityType,
+        FindDeviceCallback& callback, QualityOfService QoS)
+#else
+    OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
+        const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
+#endif
     {
-        std::stringstream requestStream;
-        requestStream<<clientResponse->resJSONPayload;
-        if(strlen((char*)clientResponse->resJSONPayload) == 0)
-        {
-            return OCRepresentation();
-        }
+        OCStackResult result;
 
-        boost::property_tree::ptree root;
-        try
+        OCCallbackData cbdata = {0};
+
+        ClientCallbackContext::DeviceListenContext* context =
+            new ClientCallbackContext::DeviceListenContext();
+        context->callback = callback;
+        context->clientWrapper = shared_from_this();
+
+        cbdata.context =  static_cast<void*>(context);
+        cbdata.cb = listenDeviceCallback;
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
+
+        auto cLock = m_csdkLock.lock();
+        if(cLock)
         {
-            boost::property_tree::read_json(requestStream, root);
+            std::lock_guard<std::recursive_mutex> lock(*cLock);
+            OCDoHandle handle;
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_GET,
+                                  deviceURI.c_str(),
+                                  nullptr, nullptr, connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  NULL, 0);
+#else
+            result = OCDoResource(&handle, OC_REST_GET,
+                                  deviceURI.c_str(),
+                                  nullptr, nullptr,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  NULL, 0);
+#endif
         }
-        catch(boost::property_tree::json_parser::json_parser_error &e)
+        else
         {
-            return OCRepresentation();
+            result = OC_STACK_ERROR;
         }
-        boost::property_tree::ptree payload = root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
-        OCRepresentation root_resource;
-        std::vector<OCRepresentation> children;
-        bool isRoot = true;
-        for ( auto payloadItr : payload)
-        {
-            OCRepresentation child;
-            try
-            {
-                auto resourceNode = payloadItr.second;
-                std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
-
-                if (isRoot)
-                {
-                    root_resource.setUri(uri);
-                }
-                else
-                {
-                    child.setUri(uri);
-                }
-
-                if( resourceNode.count(OC::Key::PROPERTYKEY) != 0 )
-                {
-                    std::vector<std::string> rTs;
-                    std::vector<std::string> ifaces;
-                    boost::property_tree::ptree properties =
-                        resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
-
-                    boost::property_tree::ptree rT =
-                        properties.get_child(OC::Key::RESOURCETYPESKEY,
-                                                boost::property_tree::ptree());
-                    for(auto itr : rT)
-                    {
-                        rTs.push_back(itr.second.data());
-                    }
-
-                    boost::property_tree::ptree iF =
-                        properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
-                    for(auto itr : iF)
-                    {
-                        ifaces.push_back(itr.second.data());
-                    }
-                    if (isRoot)
-                    {
-                        root_resource.setResourceInterfaces(ifaces);
-                        root_resource.setResourceTypes(rTs);
-                    }
-                    else
-                    {
-                        child.setResourceInterfaces(ifaces);
-                        child.setResourceTypes(rTs);
-                    }
-                }
-
-                if( resourceNode.count(OC::Key::REPKEY) != 0 )
-                {
-                    boost::property_tree::ptree rep =
-                        resourceNode.get_child(OC::Key::REPKEY, boost::property_tree::ptree());
-                    AttributeMap attrs;
-                    for( auto item : rep)
-                    {
-                        std::string name = item.first.data();
-                        std::string value = item.second.data();
-                        attrs[name] = value;
-                    }
-                    if (isRoot)
-                    {
-                        root_resource.setAttributeMap(attrs);
-                    }
-                    else
-                    {
-                        child.setAttributeMap(attrs);
-                    }
-                }
-
-                if (!isRoot)
-                    children.push_back(child);
-            }
-            catch (...)
-            {
-                // TODO
-            }
-
-            isRoot = false;
-         }
-
-         root_resource.setChildren(children);
-
-        return root_resource;
+        return result;
     }
 
     void parseServerHeaderOptions(OCClientResponse* clientResponse,
@@ -392,7 +316,8 @@ namespace OC
     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
         OCClientResponse* clientResponse)
     {
-        GetContext* context = static_cast<GetContext*>(ctx);
+        ClientCallbackContext::GetContext* context =
+            static_cast<ClientCallbackContext::GetContext*>(ctx);
 
         OCRepresentation rep;
         HeaderOptions serverHeaderOptions;
@@ -407,19 +332,26 @@ namespace OC
         return OC_STACK_DELETE_TRANSACTION;
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
+        const std::string& uri, uint8_t connectivityType, const QueryParamsMap& queryParams,
+        const HeaderOptions& headerOptions, GetCallback& callback,
+        QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
         const std::string& uri, const QueryParamsMap& queryParams,
         const HeaderOptions& headerOptions, GetCallback& callback,
         QualityOfService QoS)
+#endif
     {
         OCStackResult result;
         OCCallbackData cbdata = {0};
 
-        GetContext* ctx = new GetContext();
+        ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
         ctx->callback = callback;
         cbdata.context = static_cast<void*>(ctx);
         cbdata.cb = &getResourceCallback;
-        cbdata.cd = [](void* c){delete static_cast<GetContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
 
         auto cLock = m_csdkLock.lock();
 
@@ -433,14 +365,23 @@ namespace OC
             OCHeaderOption options[MAX_HEADER_OPTIONS];
 
             assembleHeaderOptions(options, headerOptions);
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
+                                  nullptr, nullptr, connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  options, headerOptions.size());
+#else
             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
                                   nullptr, nullptr,
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata,
                                   options, headerOptions.size());
+#endif
         }
         else
         {
+            delete ctx;
             result = OC_STACK_ERROR;
         }
         return result;
@@ -450,7 +391,8 @@ namespace OC
     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
         OCClientResponse* clientResponse)
     {
-        SetContext* context = static_cast<SetContext*>(ctx);
+        ClientCallbackContext::SetContext* context =
+            static_cast<ClientCallbackContext::SetContext*>(ctx);
         OCRepresentation attrs;
         HeaderOptions serverHeaderOptions;
 
@@ -498,28 +440,30 @@ namespace OC
 
     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
     {
-        ostringstream payload;
-        // TODO need to change the format to "{"oc":[]}"
-        payload << "{\"oc\":";
-
-        payload << rep.getJSONRepresentation();
-
-        payload << "}";
-        return payload.str();
+        MessageContainer ocInfo;
+        ocInfo.addRepresentation(rep);
+        return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
+        const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
+        const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+        PostCallback& callback, QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
         const std::string& uri, const OCRepresentation& rep,
         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
         PostCallback& callback, QualityOfService QoS)
+#endif
     {
         OCStackResult result;
         OCCallbackData cbdata = {0};
 
-        SetContext* ctx = new SetContext();
+        ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
         ctx->callback = callback;
         cbdata.cb = &setResourceCallback;
-        cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
         cbdata.context = static_cast<void*>(ctx);
 
         // TODO: in the future the cstack should be combining these two strings!
@@ -536,33 +480,48 @@ namespace OC
             OCDoHandle handle;
 
             assembleHeaderOptions(options, headerOptions);
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_POST,
+                                  os.str().c_str(), nullptr,
+                                  assembleSetResourcePayload(rep).c_str(), connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata, options, headerOptions.size());
+#else
             result = OCDoResource(&handle, OC_REST_POST,
                                   os.str().c_str(), nullptr,
                                   assembleSetResourcePayload(rep).c_str(),
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata, options, headerOptions.size());
+#endif
         }
         else
         {
+            delete ctx;
             result = OC_STACK_ERROR;
         }
 
         return result;
     }
 
-
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
+        const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
+        const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+        PutCallback& callback, QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
         const std::string& uri, const OCRepresentation& rep,
         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
         PutCallback& callback, QualityOfService QoS)
+#endif
     {
         OCStackResult result;
         OCCallbackData cbdata = {0};
 
-        SetContext* ctx = new SetContext();
+        ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
         ctx->callback = callback;
         cbdata.cb = &setResourceCallback;
-        cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
         cbdata.context = static_cast<void*>(ctx);
 
         // TODO: in the future the cstack should be combining these two strings!
@@ -579,31 +538,36 @@ namespace OC
             OCHeaderOption options[MAX_HEADER_OPTIONS];
 
             assembleHeaderOptions(options, headerOptions);
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_PUT,
+                                  os.str().c_str(), nullptr,
+                                  assembleSetResourcePayload(rep).c_str(), connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  options, headerOptions.size());
+#else
             result = OCDoResource(&handle, OC_REST_PUT,
                                   os.str().c_str(), nullptr,
                                   assembleSetResourcePayload(rep).c_str(),
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata,
                                   options, headerOptions.size());
+#endif
         }
         else
         {
+            delete ctx;
             result = OC_STACK_ERROR;
         }
 
         return result;
     }
 
-    struct DeleteContext
-    {
-        DeleteCallback callback;
-    };
-
     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
         OCClientResponse* clientResponse)
     {
-        DeleteContext* context = static_cast<DeleteContext*>(ctx);
-        OCRepresentation attrs;
+        ClientCallbackContext::DeleteContext* context =
+            static_cast<ClientCallbackContext::DeleteContext*>(ctx);
         HeaderOptions serverHeaderOptions;
 
         if(clientResponse->result == OC_STACK_OK)
@@ -615,17 +579,23 @@ namespace OC
         return OC_STACK_DELETE_TRANSACTION;
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
+        const std::string& uri, uint8_t connectivityType, const HeaderOptions& headerOptions,
+         DeleteCallback& callback, QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
         const std::string& uri, const HeaderOptions& headerOptions,
          DeleteCallback& callback, QualityOfService QoS)
+#endif
     {
         OCStackResult result;
         OCCallbackData cbdata = {0};
 
-        DeleteContext* ctx = new DeleteContext();
+        ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
         ctx->callback = callback;
         cbdata.cb = &deleteResourceCallback;
-        cbdata.cd = [](void* c){delete static_cast<DeleteContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
         cbdata.context = static_cast<void*>(ctx);
 
         ostringstream os;
@@ -641,29 +611,33 @@ namespace OC
             assembleHeaderOptions(options, headerOptions);
 
             std::lock_guard<std::recursive_mutex> lock(*cLock);
-
+#ifdef CA_INT
+            result = OCDoResource(&handle, OC_REST_DELETE,
+                                  os.str().c_str(), nullptr,
+                                  nullptr, connectivityType,
+                                  static_cast<OCQualityOfService>(m_cfg.QoS),
+                                  &cbdata, options, headerOptions.size());
+#else
             result = OCDoResource(&handle, OC_REST_DELETE,
                                   os.str().c_str(), nullptr,
                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
                                   &cbdata, options, headerOptions.size());
+#endif
         }
         else
         {
+            delete ctx;
             result = OC_STACK_ERROR;
         }
 
         return result;
     }
 
-    struct ObserveContext
-    {
-        ObserveCallback callback;
-    };
-
     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
         OCClientResponse* clientResponse)
     {
-        ObserveContext* context = static_cast<ObserveContext*>(ctx);
+        ClientCallbackContext::ObserveContext* context =
+            static_cast<ClientCallbackContext::ObserveContext*>(ctx);
         OCRepresentation attrs;
         HeaderOptions serverHeaderOptions;
         uint32_t sequenceNumber = clientResponse->sequenceNumber;
@@ -679,18 +653,25 @@ namespace OC
         return OC_STACK_KEEP_TRANSACTION;
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
+        const std::string& host, const std::string& uri, uint8_t connectivityType,
+        const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+        ObserveCallback& callback, QualityOfService QoS)
+#else
     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
+#endif
     {
         OCStackResult result;
         OCCallbackData cbdata = {0};
 
-        ObserveContext* ctx = new ObserveContext();
+        ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
         ctx->callback = callback;
         cbdata.context = static_cast<void*>(ctx);
         cbdata.cb = &observeResourceCallback;
-        cbdata.cd = [](void* c){delete static_cast<ObserveContext*>(c);};
+        cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
 
         OCMethod method;
         if (observeType == ObserveType::Observe)
@@ -717,15 +698,25 @@ namespace OC
             OCHeaderOption options[MAX_HEADER_OPTIONS];
 
             assembleHeaderOptions(options, headerOptions);
+#ifdef CA_INT
+            result = OCDoResource(handle, method,
+                                  os.str().c_str(), nullptr,
+                                  nullptr, connectivityType,
+                                  static_cast<OCQualityOfService>(QoS),
+                                  &cbdata,
+                                  options, headerOptions.size());
+#else
             result = OCDoResource(handle, method,
                                   os.str().c_str(), nullptr,
                                   nullptr,
                                   static_cast<OCQualityOfService>(QoS),
                                   &cbdata,
                                   options, headerOptions.size());
+#endif
         }
         else
         {
+            delete ctx;
             return OC_STACK_ERROR;
         }
 
@@ -745,7 +736,8 @@ namespace OC
             OCHeaderOption options[MAX_HEADER_OPTIONS];
 
             assembleHeaderOptions(options, headerOptions);
-            result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options, headerOptions.size());
+            result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
+                    headerOptions.size());
         }
         else
         {
@@ -755,42 +747,76 @@ namespace OC
         return result;
     }
 
-    struct SubscribePresenceContext
-    {
-        SubscribeCallback callback;
-    };
-
     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
-        OCClientResponse* clientResponse)
+            OCClientResponse* clientResponse)
     {
-        SubscribePresenceContext* context = static_cast<SubscribePresenceContext*>(ctx);
-        std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber);
+        char stringAddress[DEV_ADDR_SIZE_MAX];
+        ostringstream os;
+        uint16_t port;
 
-        exec.detach();
+        if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
+                OCDevAddrToPort(clientResponse->addr, &port) == 0)
+        {
+            os<<stringAddress<<":"<<port;
+
+            ClientCallbackContext::SubscribePresenceContext* context =
+                static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
+
+            std::thread exec(context->callback, clientResponse->result,
+                    clientResponse->sequenceNumber, os.str());
+
+            exec.detach();
+        }
+        else
+        {
+            oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
+                    <<"failed"<< std::flush;
+        }
         return OC_STACK_KEEP_TRANSACTION;
     }
 
+#ifdef CA_INT
+    OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
+        const std::string& host, const std::string& resourceType, uint8_t connectivityType,
+        SubscribeCallback& presenceHandler)
+#else
     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
-        const std::string& host, SubscribeCallback& presenceHandler)
+        const std::string& host, const std::string& resourceType,
+        SubscribeCallback& presenceHandler)
+#endif
     {
         OCCallbackData cbdata = {0};
 
-        SubscribePresenceContext* ctx = new SubscribePresenceContext();
+        ClientCallbackContext::SubscribePresenceContext* ctx =
+            new ClientCallbackContext::SubscribePresenceContext();
         ctx->callback = presenceHandler;
         cbdata.cb = &subscribePresenceCallback;
         cbdata.context = static_cast<void*>(ctx);
-        cbdata.cd = [](void* c){delete static_cast<SubscribePresenceContext*>(c);};
+        cbdata.cd = [](void* c)
+            {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
         auto cLock = m_csdkLock.lock();
 
         std::ostringstream os;
-
         os << host << "/oc/presence";
 
+        if(!resourceType.empty())
+        {
+            os << "?rt=" << resourceType;
+        }
+
         if(!cLock)
+        {
+            delete ctx;
             return OC_STACK_ERROR;
+        }
 
+#ifdef CA_INT
+        return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
+                            connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
+#else
         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
                             OC_LOW_QOS, &cbdata, NULL, 0);
+#endif
     }
 
     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)