[CA-Integration] Added code to get connectivity type from C to C++ layer
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             throw OCException(OC::Exception::STR_NULL_RESPONSE, OC_STACK_ERROR);
100         }
101
102         MessageContainer oc;
103         oc.setJSONRepresentation(clientResponse->resJSONPayload);
104
105         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
106         if(it == oc.representations().end())
107         {
108             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_ERROR);
109         }
110
111         // first one is considered the root, everything else is considered a child of this one.
112         OCRepresentation root = *it;
113         ++it;
114
115         std::for_each(it, oc.representations().end(),
116                 [&root](const OCRepresentation& repItr)
117                 {root.addChild(repItr);});
118         return root;
119
120     }
121
122     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
123         OCClientResponse* clientResponse)
124     {
125         ClientCallbackContext::ListenContext* context =
126             static_cast<ClientCallbackContext::ListenContext*>(ctx);
127
128         if(clientResponse->result != OC_STACK_OK)
129         {
130             oclog() << "listenCallback(): failed to create resource. clientResponse: "
131                     << clientResponse->result
132                     << std::flush;
133
134             return OC_STACK_KEEP_TRANSACTION;
135         }
136
137         auto clientWrapper = context->clientWrapper.lock();
138
139         if(!clientWrapper)
140         {
141             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
142                     << std::flush;
143             return OC_STACK_KEEP_TRANSACTION;
144         }
145
146         std::stringstream requestStream;
147         requestStream << clientResponse->resJSONPayload;
148
149         try
150         {
151 #ifdef CA_INT
152             ListenOCContainer container(clientWrapper, *clientResponse->addr,
153                     clientResponse->connType, requestStream);
154 #else
155             ListenOCContainer container(clientWrapper, *clientResponse->addr,
156                     requestStream);
157 #endif
158             // loop to ensure valid construction of all resources
159             for(auto resource : container.Resources())
160             {
161                 std::thread exec(context->callback, resource);
162                 exec.detach();
163             }
164
165         }
166         catch(const std::exception& e)
167         {
168             oclog() << "listenCallback failed to parse a malformed message: "
169                     << e.what()
170                     << std::endl
171                     << clientResponse->resJSONPayload
172                     << std::endl
173                     << clientResponse->result
174                     << std::flush;
175             return OC_STACK_KEEP_TRANSACTION;
176         }
177
178         return OC_STACK_KEEP_TRANSACTION;
179     }
180
181 #ifdef CA_INT
182     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
183         const std::string& resourceType, uint8_t connectivityType,
184         FindCallback& callback, QualityOfService QoS)
185 #else
186     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
187         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
188 #endif
189     {
190         OCStackResult result;
191
192         OCCallbackData cbdata = {0};
193
194         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
195         context->callback = callback;
196         context->clientWrapper = shared_from_this();
197
198         cbdata.context =  static_cast<void*>(context);
199         cbdata.cb = listenCallback;
200         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
201
202         auto cLock = m_csdkLock.lock();
203         if(cLock)
204         {
205             std::lock_guard<std::recursive_mutex> lock(*cLock);
206             OCDoHandle handle;
207 #ifdef CA_INT
208             result = OCDoResource(&handle, OC_REST_GET,
209                                   resourceType.c_str(),
210                                   nullptr, nullptr, connectivityType,
211                                   static_cast<OCQualityOfService>(QoS),
212                                   &cbdata,
213                                   NULL, 0);
214 #else
215             result = OCDoResource(&handle, OC_REST_GET,
216                                   resourceType.c_str(),
217                                   nullptr, nullptr,
218                                   static_cast<OCQualityOfService>(QoS),
219                                   &cbdata,
220                                   NULL, 0);
221 #endif
222         }
223         else
224         {
225             delete context;
226             result = OC_STACK_ERROR;
227         }
228         return result;
229     }
230
231     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
232             OCClientResponse* clientResponse)
233     {
234         ClientCallbackContext::DeviceListenContext* context =
235             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
236
237         OCRepresentation rep = parseGetSetCallback(clientResponse);
238         std::thread exec(context->callback, rep);
239         exec.detach();
240
241         return OC_STACK_KEEP_TRANSACTION;
242     }
243
244 #ifdef CA_INT
245     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
246         const std::string& deviceURI, uint8_t connectivityType,
247         FindDeviceCallback& callback, QualityOfService QoS)
248 #else
249     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
250         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
251 #endif
252     {
253         OCStackResult result;
254
255         OCCallbackData cbdata = {0};
256
257         ClientCallbackContext::DeviceListenContext* context =
258             new ClientCallbackContext::DeviceListenContext();
259         context->callback = callback;
260         context->clientWrapper = shared_from_this();
261
262         cbdata.context =  static_cast<void*>(context);
263         cbdata.cb = listenDeviceCallback;
264         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
265
266         auto cLock = m_csdkLock.lock();
267         if(cLock)
268         {
269             std::lock_guard<std::recursive_mutex> lock(*cLock);
270             OCDoHandle handle;
271 #ifdef CA_INT
272             result = OCDoResource(&handle, OC_REST_GET,
273                                   deviceURI.c_str(),
274                                   nullptr, nullptr, connectivityType,
275                                   static_cast<OCQualityOfService>(QoS),
276                                   &cbdata,
277                                   NULL, 0);
278 #else
279             result = OCDoResource(&handle, OC_REST_GET,
280                                   deviceURI.c_str(),
281                                   nullptr, nullptr,
282                                   static_cast<OCQualityOfService>(QoS),
283                                   &cbdata,
284                                   NULL, 0);
285 #endif
286         }
287         else
288         {
289             result = OC_STACK_ERROR;
290         }
291         return result;
292     }
293
294     void parseServerHeaderOptions(OCClientResponse* clientResponse,
295                     HeaderOptions& serverHeaderOptions)
296     {
297         if(clientResponse)
298         {
299             // Parse header options from server
300             uint16_t optionID;
301             std::string optionData;
302
303             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
304             {
305                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
306                 optionData = reinterpret_cast<const char*>
307                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
308                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
309                 serverHeaderOptions.push_back(headerOption);
310             }
311         }
312         else
313         {
314             // clientResponse is invalid
315             // TODO check proper logging
316             std::cout << " Invalid response " << std::endl;
317         }
318     }
319
320     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
321         OCClientResponse* clientResponse)
322     {
323         ClientCallbackContext::GetContext* context =
324             static_cast<ClientCallbackContext::GetContext*>(ctx);
325
326         OCRepresentation rep;
327         HeaderOptions serverHeaderOptions;
328         if(clientResponse->result == OC_STACK_OK)
329         {
330             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
331             rep = parseGetSetCallback(clientResponse);
332         }
333
334         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
335         exec.detach();
336         return OC_STACK_DELETE_TRANSACTION;
337     }
338
339 #ifdef CA_INT
340     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
341         const std::string& uri, uint8_t connectivityType, const QueryParamsMap& queryParams,
342         const HeaderOptions& headerOptions, GetCallback& callback,
343         QualityOfService QoS)
344 #else
345     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
346         const std::string& uri, const QueryParamsMap& queryParams,
347         const HeaderOptions& headerOptions, GetCallback& callback,
348         QualityOfService QoS)
349 #endif
350     {
351         OCStackResult result;
352         OCCallbackData cbdata = {0};
353
354         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
355         ctx->callback = callback;
356         cbdata.context = static_cast<void*>(ctx);
357         cbdata.cb = &getResourceCallback;
358         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
359
360         auto cLock = m_csdkLock.lock();
361
362         if(cLock)
363         {
364             std::ostringstream os;
365             os << host << assembleSetResourceUri(uri, queryParams).c_str();
366
367             std::lock_guard<std::recursive_mutex> lock(*cLock);
368             OCDoHandle handle;
369             OCHeaderOption options[MAX_HEADER_OPTIONS];
370
371             assembleHeaderOptions(options, headerOptions);
372 #ifdef CA_INT
373             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
374                                   nullptr, nullptr, connectivityType,
375                                   static_cast<OCQualityOfService>(QoS),
376                                   &cbdata,
377                                   options, headerOptions.size());
378 #else
379             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
380                                   nullptr, nullptr,
381                                   static_cast<OCQualityOfService>(QoS),
382                                   &cbdata,
383                                   options, headerOptions.size());
384 #endif
385         }
386         else
387         {
388             delete ctx;
389             result = OC_STACK_ERROR;
390         }
391         return result;
392     }
393
394
395     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
396         OCClientResponse* clientResponse)
397     {
398         ClientCallbackContext::SetContext* context =
399             static_cast<ClientCallbackContext::SetContext*>(ctx);
400         OCRepresentation attrs;
401         HeaderOptions serverHeaderOptions;
402
403         if (OC_STACK_OK               == clientResponse->result ||
404             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
405             OC_STACK_RESOURCE_DELETED == clientResponse->result)
406         {
407             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
408             attrs = parseGetSetCallback(clientResponse);
409         }
410
411         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
412         exec.detach();
413         return OC_STACK_DELETE_TRANSACTION;
414     }
415
416     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
417         const QueryParamsMap& queryParams)
418     {
419         if(uri.back() == '/')
420         {
421             uri.resize(uri.size()-1);
422         }
423
424         ostringstream paramsList;
425         if(queryParams.size() > 0)
426         {
427             paramsList << '?';
428         }
429
430         for(auto& param : queryParams)
431         {
432             paramsList << param.first <<'='<<param.second<<'&';
433         }
434
435         std::string queryString = paramsList.str();
436         if(queryString.back() == '&')
437         {
438             queryString.resize(queryString.size() - 1);
439         }
440
441         std::string ret = uri + queryString;
442         return ret;
443     }
444
445     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
446     {
447         MessageContainer ocInfo;
448         ocInfo.addRepresentation(rep);
449         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
450     }
451
452 #ifdef CA_INT
453     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
454         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
455         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
456         PostCallback& callback, QualityOfService QoS)
457 #else
458     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
459         const std::string& uri, const OCRepresentation& rep,
460         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
461         PostCallback& callback, QualityOfService QoS)
462 #endif
463     {
464         OCStackResult result;
465         OCCallbackData cbdata = {0};
466
467         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
468         ctx->callback = callback;
469         cbdata.cb = &setResourceCallback;
470         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
471         cbdata.context = static_cast<void*>(ctx);
472
473         // TODO: in the future the cstack should be combining these two strings!
474         ostringstream os;
475         os << host << assembleSetResourceUri(uri, queryParams).c_str();
476         // TODO: end of above
477
478         auto cLock = m_csdkLock.lock();
479
480         if(cLock)
481         {
482             std::lock_guard<std::recursive_mutex> lock(*cLock);
483             OCHeaderOption options[MAX_HEADER_OPTIONS];
484             OCDoHandle handle;
485
486             assembleHeaderOptions(options, headerOptions);
487 #ifdef CA_INT
488             result = OCDoResource(&handle, OC_REST_POST,
489                                   os.str().c_str(), nullptr,
490                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
491                                   static_cast<OCQualityOfService>(QoS),
492                                   &cbdata, options, headerOptions.size());
493 #else
494             result = OCDoResource(&handle, OC_REST_POST,
495                                   os.str().c_str(), nullptr,
496                                   assembleSetResourcePayload(rep).c_str(),
497                                   static_cast<OCQualityOfService>(QoS),
498                                   &cbdata, options, headerOptions.size());
499 #endif
500         }
501         else
502         {
503             delete ctx;
504             result = OC_STACK_ERROR;
505         }
506
507         return result;
508     }
509
510 #ifdef CA_INT
511     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
512         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
513         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
514         PutCallback& callback, QualityOfService QoS)
515 #else
516     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
517         const std::string& uri, const OCRepresentation& rep,
518         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
519         PutCallback& callback, QualityOfService QoS)
520 #endif
521     {
522         OCStackResult result;
523         OCCallbackData cbdata = {0};
524
525         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
526         ctx->callback = callback;
527         cbdata.cb = &setResourceCallback;
528         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
529         cbdata.context = static_cast<void*>(ctx);
530
531         // TODO: in the future the cstack should be combining these two strings!
532         ostringstream os;
533         os << host << assembleSetResourceUri(uri, queryParams).c_str();
534         // TODO: end of above
535
536         auto cLock = m_csdkLock.lock();
537
538         if(cLock)
539         {
540             std::lock_guard<std::recursive_mutex> lock(*cLock);
541             OCDoHandle handle;
542             OCHeaderOption options[MAX_HEADER_OPTIONS];
543
544             assembleHeaderOptions(options, headerOptions);
545 #ifdef CA_INT
546             result = OCDoResource(&handle, OC_REST_PUT,
547                                   os.str().c_str(), nullptr,
548                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
549                                   static_cast<OCQualityOfService>(QoS),
550                                   &cbdata,
551                                   options, headerOptions.size());
552 #else
553             result = OCDoResource(&handle, OC_REST_PUT,
554                                   os.str().c_str(), nullptr,
555                                   assembleSetResourcePayload(rep).c_str(),
556                                   static_cast<OCQualityOfService>(QoS),
557                                   &cbdata,
558                                   options, headerOptions.size());
559 #endif
560         }
561         else
562         {
563             delete ctx;
564             result = OC_STACK_ERROR;
565         }
566
567         return result;
568     }
569
570     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
571         OCClientResponse* clientResponse)
572     {
573         ClientCallbackContext::DeleteContext* context =
574             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
575         HeaderOptions serverHeaderOptions;
576
577         if(clientResponse->result == OC_STACK_OK)
578         {
579             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
580         }
581         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
582         exec.detach();
583         return OC_STACK_DELETE_TRANSACTION;
584     }
585
586 #ifdef CA_INT
587     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
588         const std::string& uri, uint8_t connectivityType, const HeaderOptions& headerOptions,
589          DeleteCallback& callback, QualityOfService QoS)
590 #else
591     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
592         const std::string& uri, const HeaderOptions& headerOptions,
593          DeleteCallback& callback, QualityOfService QoS)
594 #endif
595     {
596         OCStackResult result;
597         OCCallbackData cbdata = {0};
598
599         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
600         ctx->callback = callback;
601         cbdata.cb = &deleteResourceCallback;
602         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
603         cbdata.context = static_cast<void*>(ctx);
604
605         ostringstream os;
606         os << host << uri;
607
608         auto cLock = m_csdkLock.lock();
609
610         if(cLock)
611         {
612             OCHeaderOption options[MAX_HEADER_OPTIONS];
613             OCDoHandle handle;
614
615             assembleHeaderOptions(options, headerOptions);
616
617             std::lock_guard<std::recursive_mutex> lock(*cLock);
618 #ifdef CA_INT
619             result = OCDoResource(&handle, OC_REST_DELETE,
620                                   os.str().c_str(), nullptr,
621                                   nullptr, connectivityType,
622                                   static_cast<OCQualityOfService>(m_cfg.QoS),
623                                   &cbdata, options, headerOptions.size());
624 #else
625             result = OCDoResource(&handle, OC_REST_DELETE,
626                                   os.str().c_str(), nullptr,
627                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
628                                   &cbdata, options, headerOptions.size());
629 #endif
630         }
631         else
632         {
633             delete ctx;
634             result = OC_STACK_ERROR;
635         }
636
637         return result;
638     }
639
640     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
641         OCClientResponse* clientResponse)
642     {
643         ClientCallbackContext::ObserveContext* context =
644             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
645         OCRepresentation attrs;
646         HeaderOptions serverHeaderOptions;
647         uint32_t sequenceNumber = clientResponse->sequenceNumber;
648
649         if(clientResponse->result == OC_STACK_OK)
650         {
651             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
652             attrs = parseGetSetCallback(clientResponse);
653         }
654         std::thread exec(context->callback, serverHeaderOptions, attrs,
655                     clientResponse->result, sequenceNumber);
656         exec.detach();
657         return OC_STACK_KEEP_TRANSACTION;
658     }
659
660 #ifdef CA_INT
661     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
662         const std::string& host, const std::string& uri, uint8_t connectivityType,
663         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
664         ObserveCallback& callback, QualityOfService QoS)
665 #else
666     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
667         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
668         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
669 #endif
670     {
671         OCStackResult result;
672         OCCallbackData cbdata = {0};
673
674         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
675         ctx->callback = callback;
676         cbdata.context = static_cast<void*>(ctx);
677         cbdata.cb = &observeResourceCallback;
678         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
679
680         OCMethod method;
681         if (observeType == ObserveType::Observe)
682         {
683             method = OC_REST_OBSERVE;
684         }
685         else if (observeType == ObserveType::ObserveAll)
686         {
687             method = OC_REST_OBSERVE_ALL;
688         }
689         else
690         {
691             method = OC_REST_OBSERVE_ALL;
692         }
693
694         auto cLock = m_csdkLock.lock();
695
696         if(cLock)
697         {
698             std::ostringstream os;
699             os << host << assembleSetResourceUri(uri, queryParams).c_str();
700
701             std::lock_guard<std::recursive_mutex> lock(*cLock);
702             OCHeaderOption options[MAX_HEADER_OPTIONS];
703
704             assembleHeaderOptions(options, headerOptions);
705 #ifdef CA_INT
706             result = OCDoResource(handle, method,
707                                   os.str().c_str(), nullptr,
708                                   nullptr, connectivityType,
709                                   static_cast<OCQualityOfService>(QoS),
710                                   &cbdata,
711                                   options, headerOptions.size());
712 #else
713             result = OCDoResource(handle, method,
714                                   os.str().c_str(), nullptr,
715                                   nullptr,
716                                   static_cast<OCQualityOfService>(QoS),
717                                   &cbdata,
718                                   options, headerOptions.size());
719 #endif
720         }
721         else
722         {
723             delete ctx;
724             return OC_STACK_ERROR;
725         }
726
727         return result;
728     }
729
730     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
731         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
732         QualityOfService QoS)
733     {
734         OCStackResult result;
735         auto cLock = m_csdkLock.lock();
736
737         if(cLock)
738         {
739             std::lock_guard<std::recursive_mutex> lock(*cLock);
740             OCHeaderOption options[MAX_HEADER_OPTIONS];
741
742             assembleHeaderOptions(options, headerOptions);
743             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
744                     headerOptions.size());
745         }
746         else
747         {
748             result = OC_STACK_ERROR;
749         }
750
751         return result;
752     }
753
754     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
755             OCClientResponse* clientResponse)
756     {
757         char stringAddress[DEV_ADDR_SIZE_MAX];
758         ostringstream os;
759         uint16_t port;
760
761         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
762                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
763         {
764             os<<stringAddress<<":"<<port;
765
766             ClientCallbackContext::SubscribePresenceContext* context =
767                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
768
769             std::thread exec(context->callback, clientResponse->result,
770                     clientResponse->sequenceNumber, os.str());
771
772             exec.detach();
773         }
774         else
775         {
776             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
777                     <<"failed"<< std::flush;
778         }
779         return OC_STACK_KEEP_TRANSACTION;
780     }
781
782 #ifdef CA_INT
783     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
784         const std::string& host, const std::string& resourceType, uint8_t connectivityType,
785         SubscribeCallback& presenceHandler)
786 #else
787     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
788         const std::string& host, const std::string& resourceType,
789         SubscribeCallback& presenceHandler)
790 #endif
791     {
792         OCCallbackData cbdata = {0};
793
794         ClientCallbackContext::SubscribePresenceContext* ctx =
795             new ClientCallbackContext::SubscribePresenceContext();
796         ctx->callback = presenceHandler;
797         cbdata.cb = &subscribePresenceCallback;
798         cbdata.context = static_cast<void*>(ctx);
799         cbdata.cd = [](void* c)
800             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
801         auto cLock = m_csdkLock.lock();
802
803         std::ostringstream os;
804         os << host << "/oc/presence";
805
806         if(!resourceType.empty())
807         {
808             os << "?rt=" << resourceType;
809         }
810
811         if(!cLock)
812         {
813             delete ctx;
814             return OC_STACK_ERROR;
815         }
816
817 #ifdef CA_INT
818         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
819                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
820 #else
821         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
822                             OC_LOW_QOS, &cbdata, NULL, 0);
823 #endif
824     }
825
826     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
827     {
828         OCStackResult result;
829         auto cLock = m_csdkLock.lock();
830
831         if(cLock)
832         {
833             std::lock_guard<std::recursive_mutex> lock(*cLock);
834             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
835         }
836         else
837         {
838             result = OC_STACK_ERROR;
839         }
840
841         return result;
842     }
843
844     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
845     {
846         qos = m_cfg.QoS;
847         return OC_STACK_OK;
848     }
849
850     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
851            const HeaderOptions& headerOptions)
852     {
853         int i = 0;
854
855         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
856         {
857             options[i].protocolID = OC_COAP_ID;
858             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
859             options[i].optionLength = (it->getOptionData()).length() + 1;
860             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
861                     (it->getOptionData()).length() + 1);
862             i++;
863         }
864     }
865 }