Merge from master to CA branch
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             throw OCException(OC::Exception::STR_NULL_RESPONSE, OC_STACK_ERROR);
100         }
101
102         MessageContainer oc;
103         oc.setJSONRepresentation(clientResponse->resJSONPayload);
104
105         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
106         if(it == oc.representations().end())
107         {
108             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_ERROR);
109         }
110
111         // first one is considered the root, everything else is considered a child of this one.
112         OCRepresentation root = *it;
113         ++it;
114
115         std::for_each(it, oc.representations().end(),
116                 [&root](const OCRepresentation& repItr)
117                 {root.addChild(repItr);});
118         return root;
119
120     }
121
122     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
123         OCClientResponse* clientResponse)
124     {
125         ClientCallbackContext::ListenContext* context =
126             static_cast<ClientCallbackContext::ListenContext*>(ctx);
127
128         if(clientResponse->result != OC_STACK_OK)
129         {
130             oclog() << "listenCallback(): failed to create resource. clientResponse: "
131                     << clientResponse->result
132                     << std::flush;
133
134             return OC_STACK_KEEP_TRANSACTION;
135         }
136
137         auto clientWrapper = context->clientWrapper.lock();
138
139         if(!clientWrapper)
140         {
141             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
142                     << std::flush;
143             return OC_STACK_KEEP_TRANSACTION;
144         }
145
146         std::stringstream requestStream;
147         requestStream << clientResponse->resJSONPayload;
148
149         try
150         {
151
152 #ifdef CA_INT
153             ListenOCContainer container(clientWrapper, *clientResponse->addr,
154                     clientResponse->connType, requestStream);
155 #else
156             ListenOCContainer container(clientWrapper, *clientResponse->addr,
157                     requestStream);
158 #endif
159             // loop to ensure valid construction of all resources
160             for(auto resource : container.Resources())
161             {
162                 std::thread exec(context->callback, resource);
163                 exec.detach();
164             }
165
166         }
167         catch(const std::exception& e)
168         {
169             oclog() << "listenCallback failed to parse a malformed message: "
170                     << e.what()
171                     << std::endl
172                     << clientResponse->resJSONPayload
173                     << std::endl
174                     << clientResponse->result
175                     << std::flush;
176             return OC_STACK_KEEP_TRANSACTION;
177         }
178
179         return OC_STACK_KEEP_TRANSACTION;
180     }
181
182 #ifdef CA_INT
183     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
184         const std::string& resourceType, uint8_t connectivityType,
185         FindCallback& callback, QualityOfService QoS)
186 #else
187     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
188         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
189 #endif
190     {
191         OCStackResult result;
192
193         OCCallbackData cbdata = {0};
194
195         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
196         context->callback = callback;
197         context->clientWrapper = shared_from_this();
198
199         cbdata.context =  static_cast<void*>(context);
200         cbdata.cb = listenCallback;
201         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
202
203         auto cLock = m_csdkLock.lock();
204         if(cLock)
205         {
206             std::lock_guard<std::recursive_mutex> lock(*cLock);
207             OCDoHandle handle;
208 #ifdef CA_INT
209             result = OCDoResource(&handle, OC_REST_GET,
210                                   resourceType.c_str(),
211                                   nullptr, nullptr, connectivityType,
212                                   static_cast<OCQualityOfService>(QoS),
213                                   &cbdata,
214                                   NULL, 0);
215 #else
216             result = OCDoResource(&handle, OC_REST_GET,
217                                   resourceType.c_str(),
218                                   nullptr, nullptr,
219                                   static_cast<OCQualityOfService>(QoS),
220                                   &cbdata,
221                                   NULL, 0);
222 #endif
223         }
224         else
225         {
226             delete context;
227             result = OC_STACK_ERROR;
228         }
229         return result;
230     }
231
232     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
233             OCClientResponse* clientResponse)
234     {
235         ClientCallbackContext::DeviceListenContext* context =
236             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
237
238         OCRepresentation rep = parseGetSetCallback(clientResponse);
239         std::thread exec(context->callback, rep);
240         exec.detach();
241
242         return OC_STACK_KEEP_TRANSACTION;
243     }
244
245 #ifdef CA_INT
246     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
247         const std::string& deviceURI, uint8_t connectivityType,
248         FindDeviceCallback& callback, QualityOfService QoS)
249 #else
250     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
251         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
252 #endif
253     {
254         OCStackResult result;
255
256         OCCallbackData cbdata = {0};
257         ClientCallbackContext::DeviceListenContext* context =
258             new ClientCallbackContext::DeviceListenContext();
259         context->callback = callback;
260         context->clientWrapper = shared_from_this();
261         cbdata.context =  static_cast<void*>(context);
262         cbdata.cb = listenDeviceCallback;
263         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
264
265         auto cLock = m_csdkLock.lock();
266         if(cLock)
267         {
268             std::lock_guard<std::recursive_mutex> lock(*cLock);
269             OCDoHandle handle;
270 #ifdef CA_INT
271             result = OCDoResource(&handle, OC_REST_GET,
272                                   deviceURI.c_str(),
273                                   nullptr, nullptr, connectivityType,
274                                   static_cast<OCQualityOfService>(QoS),
275                                   &cbdata,
276                                   NULL, 0);
277 #else
278             result = OCDoResource(&handle, OC_REST_GET,
279                                   deviceURI.c_str(),
280                                   nullptr, nullptr,
281                                   static_cast<OCQualityOfService>(QoS),
282                                   &cbdata,
283                                   NULL, 0);
284 #endif
285         }
286         else
287         {
288             result = OC_STACK_ERROR;
289         }
290         return result;
291     }
292
293     void parseServerHeaderOptions(OCClientResponse* clientResponse,
294                     HeaderOptions& serverHeaderOptions)
295     {
296         if(clientResponse)
297         {
298             // Parse header options from server
299             uint16_t optionID;
300             std::string optionData;
301
302             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
303             {
304                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
305                 optionData = reinterpret_cast<const char*>
306                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
307                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
308                 serverHeaderOptions.push_back(headerOption);
309             }
310         }
311         else
312         {
313             // clientResponse is invalid
314             // TODO check proper logging
315             std::cout << " Invalid response " << std::endl;
316         }
317     }
318
319     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
320         OCClientResponse* clientResponse)
321     {
322         ClientCallbackContext::GetContext* context =
323             static_cast<ClientCallbackContext::GetContext*>(ctx);
324
325         OCRepresentation rep;
326         HeaderOptions serverHeaderOptions;
327         if(clientResponse->result == OC_STACK_OK)
328         {
329             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
330             rep = parseGetSetCallback(clientResponse);
331         }
332
333         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
334         exec.detach();
335         return OC_STACK_DELETE_TRANSACTION;
336     }
337
338 #ifdef CA_INT
339     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
340         const std::string& uri, uint8_t connectivityType, const QueryParamsMap& queryParams,
341         const HeaderOptions& headerOptions, GetCallback& callback,
342         QualityOfService QoS)
343 #else
344     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
345         const std::string& uri, const QueryParamsMap& queryParams,
346         const HeaderOptions& headerOptions, GetCallback& callback,
347         QualityOfService QoS)
348 #endif
349     {
350         OCStackResult result;
351         OCCallbackData cbdata = {0};
352
353         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
354         ctx->callback = callback;
355         cbdata.context = static_cast<void*>(ctx);
356         cbdata.cb = &getResourceCallback;
357         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
358
359         auto cLock = m_csdkLock.lock();
360
361         if(cLock)
362         {
363             std::ostringstream os;
364             os << host << assembleSetResourceUri(uri, queryParams).c_str();
365
366             std::lock_guard<std::recursive_mutex> lock(*cLock);
367             OCDoHandle handle;
368             OCHeaderOption options[MAX_HEADER_OPTIONS];
369
370             assembleHeaderOptions(options, headerOptions);
371 #ifdef CA_INT
372             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
373                                   nullptr, nullptr, connectivityType,
374                                   static_cast<OCQualityOfService>(QoS),
375                                   &cbdata,
376                                   options, headerOptions.size());
377 #else
378             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
379                                   nullptr, nullptr,
380                                   static_cast<OCQualityOfService>(QoS),
381                                   &cbdata,
382                                   options, headerOptions.size());
383 #endif
384         }
385         else
386         {
387             delete ctx;
388             result = OC_STACK_ERROR;
389         }
390         return result;
391     }
392
393
394     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
395         OCClientResponse* clientResponse)
396     {
397         ClientCallbackContext::SetContext* context =
398             static_cast<ClientCallbackContext::SetContext*>(ctx);
399         OCRepresentation attrs;
400         HeaderOptions serverHeaderOptions;
401
402         if (OC_STACK_OK               == clientResponse->result ||
403             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
404             OC_STACK_RESOURCE_DELETED == clientResponse->result)
405         {
406             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
407             attrs = parseGetSetCallback(clientResponse);
408         }
409
410         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
411         exec.detach();
412         return OC_STACK_DELETE_TRANSACTION;
413     }
414
415     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
416         const QueryParamsMap& queryParams)
417     {
418         if(uri.back() == '/')
419         {
420             uri.resize(uri.size()-1);
421         }
422
423         ostringstream paramsList;
424         if(queryParams.size() > 0)
425         {
426             paramsList << '?';
427         }
428
429         for(auto& param : queryParams)
430         {
431             paramsList << param.first <<'='<<param.second<<'&';
432         }
433
434         std::string queryString = paramsList.str();
435         if(queryString.back() == '&')
436         {
437             queryString.resize(queryString.size() - 1);
438         }
439
440         std::string ret = uri + queryString;
441         return ret;
442     }
443
444     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
445     {
446         MessageContainer ocInfo;
447         ocInfo.addRepresentation(rep);
448         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
449     }
450
451 #ifdef CA_INT
452     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
453         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
454         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
455         PostCallback& callback, QualityOfService QoS)
456 #else
457     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
458         const std::string& uri, const OCRepresentation& rep,
459         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
460         PostCallback& callback, QualityOfService QoS)
461 #endif
462     {
463         OCStackResult result;
464         OCCallbackData cbdata = {0};
465
466         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
467         ctx->callback = callback;
468         cbdata.cb = &setResourceCallback;
469         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
470         cbdata.context = static_cast<void*>(ctx);
471
472         // TODO: in the future the cstack should be combining these two strings!
473         ostringstream os;
474         os << host << assembleSetResourceUri(uri, queryParams).c_str();
475         // TODO: end of above
476
477         auto cLock = m_csdkLock.lock();
478
479         if(cLock)
480         {
481             std::lock_guard<std::recursive_mutex> lock(*cLock);
482             OCHeaderOption options[MAX_HEADER_OPTIONS];
483             OCDoHandle handle;
484
485             assembleHeaderOptions(options, headerOptions);
486 #ifdef CA_INT
487             result = OCDoResource(&handle, OC_REST_POST,
488                                   os.str().c_str(), nullptr,
489                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
490                                   static_cast<OCQualityOfService>(QoS),
491                                   &cbdata, options, headerOptions.size());
492 #else
493             result = OCDoResource(&handle, OC_REST_POST,
494                                   os.str().c_str(), nullptr,
495                                   assembleSetResourcePayload(rep).c_str(),
496                                   static_cast<OCQualityOfService>(QoS),
497                                   &cbdata, options, headerOptions.size());
498 #endif
499         }
500         else
501         {
502             delete ctx;
503             result = OC_STACK_ERROR;
504         }
505
506         return result;
507     }
508
509 #ifdef CA_INT
510     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
511         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
512         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
513         PutCallback& callback, QualityOfService QoS)
514 #else
515     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
516         const std::string& uri, const OCRepresentation& rep,
517         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
518         PutCallback& callback, QualityOfService QoS)
519 #endif
520     {
521         OCStackResult result;
522         OCCallbackData cbdata = {0};
523
524         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
525         ctx->callback = callback;
526         cbdata.cb = &setResourceCallback;
527         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
528         cbdata.context = static_cast<void*>(ctx);
529
530         // TODO: in the future the cstack should be combining these two strings!
531         ostringstream os;
532         os << host << assembleSetResourceUri(uri, queryParams).c_str();
533         // TODO: end of above
534
535         auto cLock = m_csdkLock.lock();
536
537         if(cLock)
538         {
539             std::lock_guard<std::recursive_mutex> lock(*cLock);
540             OCDoHandle handle;
541             OCHeaderOption options[MAX_HEADER_OPTIONS];
542
543             assembleHeaderOptions(options, headerOptions);
544 #ifdef CA_INT
545             result = OCDoResource(&handle, OC_REST_PUT,
546                                   os.str().c_str(), nullptr,
547                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
548                                   static_cast<OCQualityOfService>(QoS),
549                                   &cbdata,
550                                   options, headerOptions.size());
551 #else
552             result = OCDoResource(&handle, OC_REST_PUT,
553                                   os.str().c_str(), nullptr,
554                                   assembleSetResourcePayload(rep).c_str(),
555                                   static_cast<OCQualityOfService>(QoS),
556                                   &cbdata,
557                                   options, headerOptions.size());
558 #endif
559         }
560         else
561         {
562             delete ctx;
563             result = OC_STACK_ERROR;
564         }
565
566         return result;
567     }
568
569     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
570         OCClientResponse* clientResponse)
571     {
572         ClientCallbackContext::DeleteContext* context =
573             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
574         HeaderOptions serverHeaderOptions;
575
576         if(clientResponse->result == OC_STACK_OK)
577         {
578             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
579         }
580         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
581         exec.detach();
582         return OC_STACK_DELETE_TRANSACTION;
583     }
584
585 #ifdef CA_INT
586     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
587         const std::string& uri, uint8_t connectivityType, const HeaderOptions& headerOptions,
588          DeleteCallback& callback, QualityOfService QoS)
589 #else
590     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
591         const std::string& uri, const HeaderOptions& headerOptions,
592          DeleteCallback& callback, QualityOfService QoS)
593 #endif
594     {
595         OCStackResult result;
596         OCCallbackData cbdata = {0};
597
598         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
599         ctx->callback = callback;
600         cbdata.cb = &deleteResourceCallback;
601         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
602         cbdata.context = static_cast<void*>(ctx);
603
604         ostringstream os;
605         os << host << uri;
606
607         auto cLock = m_csdkLock.lock();
608
609         if(cLock)
610         {
611             OCHeaderOption options[MAX_HEADER_OPTIONS];
612             OCDoHandle handle;
613
614             assembleHeaderOptions(options, headerOptions);
615
616             std::lock_guard<std::recursive_mutex> lock(*cLock);
617 #ifdef CA_INT
618             result = OCDoResource(&handle, OC_REST_DELETE,
619                                   os.str().c_str(), nullptr,
620                                   nullptr, connectivityType,
621                                   static_cast<OCQualityOfService>(m_cfg.QoS),
622                                   &cbdata, options, headerOptions.size());
623 #else
624             result = OCDoResource(&handle, OC_REST_DELETE,
625                                   os.str().c_str(), nullptr,
626                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
627                                   &cbdata, options, headerOptions.size());
628 #endif
629         }
630         else
631         {
632             delete ctx;
633             result = OC_STACK_ERROR;
634         }
635
636         return result;
637     }
638
639     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
640         OCClientResponse* clientResponse)
641     {
642         ClientCallbackContext::ObserveContext* context =
643             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
644         OCRepresentation attrs;
645         HeaderOptions serverHeaderOptions;
646         uint32_t sequenceNumber = clientResponse->sequenceNumber;
647
648         if(clientResponse->result == OC_STACK_OK)
649         {
650             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
651             attrs = parseGetSetCallback(clientResponse);
652         }
653         std::thread exec(context->callback, serverHeaderOptions, attrs,
654                     clientResponse->result, sequenceNumber);
655         exec.detach();
656         return OC_STACK_KEEP_TRANSACTION;
657     }
658
659 #ifdef CA_INT
660     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
661         const std::string& host, const std::string& uri, uint8_t connectivityType,
662         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
663         ObserveCallback& callback, QualityOfService QoS)
664 #else
665     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
666         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
667         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
668 #endif
669     {
670         OCStackResult result;
671         OCCallbackData cbdata = {0};
672
673         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
674         ctx->callback = callback;
675         cbdata.context = static_cast<void*>(ctx);
676         cbdata.cb = &observeResourceCallback;
677         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
678
679         OCMethod method;
680         if (observeType == ObserveType::Observe)
681         {
682             method = OC_REST_OBSERVE;
683         }
684         else if (observeType == ObserveType::ObserveAll)
685         {
686             method = OC_REST_OBSERVE_ALL;
687         }
688         else
689         {
690             method = OC_REST_OBSERVE_ALL;
691         }
692
693         auto cLock = m_csdkLock.lock();
694
695         if(cLock)
696         {
697             std::ostringstream os;
698             os << host << assembleSetResourceUri(uri, queryParams).c_str();
699
700             std::lock_guard<std::recursive_mutex> lock(*cLock);
701             OCHeaderOption options[MAX_HEADER_OPTIONS];
702
703             assembleHeaderOptions(options, headerOptions);
704 #ifdef CA_INT
705             result = OCDoResource(handle, method,
706                                   os.str().c_str(), nullptr,
707                                   nullptr, connectivityType,
708                                   static_cast<OCQualityOfService>(QoS),
709                                   &cbdata,
710                                   options, headerOptions.size());
711 #else
712             result = OCDoResource(handle, method,
713                                   os.str().c_str(), nullptr,
714                                   nullptr,
715                                   static_cast<OCQualityOfService>(QoS),
716                                   &cbdata,
717                                   options, headerOptions.size());
718 #endif
719         }
720         else
721         {
722             delete ctx;
723             return OC_STACK_ERROR;
724         }
725
726         return result;
727     }
728
729     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
730         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
731         QualityOfService QoS)
732     {
733         OCStackResult result;
734         auto cLock = m_csdkLock.lock();
735
736         if(cLock)
737         {
738             std::lock_guard<std::recursive_mutex> lock(*cLock);
739             OCHeaderOption options[MAX_HEADER_OPTIONS];
740
741             assembleHeaderOptions(options, headerOptions);
742             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
743                     headerOptions.size());
744         }
745         else
746         {
747             result = OC_STACK_ERROR;
748         }
749
750         return result;
751     }
752
753     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
754             OCClientResponse* clientResponse)
755     {
756         char stringAddress[DEV_ADDR_SIZE_MAX];
757         ostringstream os;
758         uint16_t port;
759
760         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
761                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
762         {
763             os<<stringAddress<<":"<<port;
764
765             ClientCallbackContext::SubscribePresenceContext* context =
766                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
767
768             std::thread exec(context->callback, clientResponse->result,
769                     clientResponse->sequenceNumber, os.str());
770
771             exec.detach();
772         }
773         else
774         {
775             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
776                     <<"failed"<< std::flush;
777         }
778         return OC_STACK_KEEP_TRANSACTION;
779     }
780
781 #ifdef CA_INT
782     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
783         const std::string& host, const std::string& resourceType, uint8_t connectivityType,
784         SubscribeCallback& presenceHandler)
785 #else
786     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
787         const std::string& host, const std::string& resourceType,
788         SubscribeCallback& presenceHandler)
789 #endif
790     {
791         OCCallbackData cbdata = {0};
792
793         ClientCallbackContext::SubscribePresenceContext* ctx =
794             new ClientCallbackContext::SubscribePresenceContext();
795         ctx->callback = presenceHandler;
796         cbdata.cb = &subscribePresenceCallback;
797         cbdata.context = static_cast<void*>(ctx);
798         cbdata.cd = [](void* c)
799             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
800         auto cLock = m_csdkLock.lock();
801
802         std::ostringstream os;
803         os << host << "/oc/presence";
804
805         if(!resourceType.empty())
806         {
807             os << "?rt=" << resourceType;
808         }
809
810         if(!cLock)
811         {
812             delete ctx;
813             return OC_STACK_ERROR;
814         }
815
816 #ifdef CA_INT
817         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
818                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
819 #else
820         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
821                             OC_LOW_QOS, &cbdata, NULL, 0);
822 #endif
823     }
824
825     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
826     {
827         OCStackResult result;
828         auto cLock = m_csdkLock.lock();
829
830         if(cLock)
831         {
832             std::lock_guard<std::recursive_mutex> lock(*cLock);
833             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
834         }
835         else
836         {
837             result = OC_STACK_ERROR;
838         }
839
840         return result;
841     }
842
843     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
844     {
845         qos = m_cfg.QoS;
846         return OC_STACK_OK;
847     }
848
849     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
850            const HeaderOptions& headerOptions)
851     {
852         int i = 0;
853
854         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
855         {
856             options[i].protocolID = OC_COAP_ID;
857             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
858             options[i].optionLength = (it->getOptionData()).length() + 1;
859             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
860                     (it->getOptionData()).length() + 1);
861             i++;
862         }
863     }
864 }