Implement ServerID parsing and sample app
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             throw OCException(OC::Exception::STR_NULL_RESPONSE, OC_STACK_ERROR);
100         }
101
102         MessageContainer oc;
103         oc.setJSONRepresentation(clientResponse->resJSONPayload);
104
105         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
106         if(it == oc.representations().end())
107         {
108             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_ERROR);
109         }
110
111         // first one is considered the root, everything else is considered a child of this one.
112         OCRepresentation root = *it;
113         ++it;
114
115         std::for_each(it, oc.representations().end(),
116                 [&root](const OCRepresentation& repItr)
117                 {root.addChild(repItr);});
118         return root;
119
120     }
121
122     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
123         OCClientResponse* clientResponse)
124     {
125         ClientCallbackContext::ListenContext* context =
126             static_cast<ClientCallbackContext::ListenContext*>(ctx);
127
128         if(clientResponse->result != OC_STACK_OK)
129         {
130             oclog() << "listenCallback(): failed to create resource. clientResponse: "
131                     << clientResponse->result
132                     << std::flush;
133
134             return OC_STACK_KEEP_TRANSACTION;
135         }
136
137         auto clientWrapper = context->clientWrapper.lock();
138
139         if(!clientWrapper)
140         {
141             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
142                     << std::flush;
143             return OC_STACK_KEEP_TRANSACTION;
144         }
145
146         std::stringstream requestStream;
147         requestStream << clientResponse->resJSONPayload;
148
149         try
150         {
151             ListenOCContainer container(clientWrapper, *clientResponse->addr,
152                     requestStream);
153
154             // loop to ensure valid construction of all resources
155             for(auto resource : container.Resources())
156             {
157                 std::thread exec(context->callback, resource);
158                 exec.detach();
159             }
160
161         }
162         catch(const std::exception& e)
163         {
164             oclog() << "listenCallback failed to parse a malformed message: "
165                     << e.what()
166                     << std::endl
167                     << clientResponse->resJSONPayload
168                     << std::endl
169                     << clientResponse->result
170                     << std::flush;
171             return OC_STACK_KEEP_TRANSACTION;
172         }
173
174         return OC_STACK_KEEP_TRANSACTION;
175     }
176
177 #ifdef CA_INT
178     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
179         const std::string& resourceType, uint8_t connectivityType,
180         FindCallback& callback, QualityOfService QoS)
181 #else
182     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
183         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
184 #endif
185     {
186         OCStackResult result;
187
188         OCCallbackData cbdata = {0};
189
190         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
191         context->callback = callback;
192         context->clientWrapper = shared_from_this();
193
194         cbdata.context =  static_cast<void*>(context);
195         cbdata.cb = listenCallback;
196         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
197
198         auto cLock = m_csdkLock.lock();
199         if(cLock)
200         {
201             std::lock_guard<std::recursive_mutex> lock(*cLock);
202             OCDoHandle handle;
203 #ifdef CA_INT
204             result = OCDoResource(&handle, OC_REST_GET,
205                                   resourceType.c_str(),
206                                   nullptr, nullptr, connectivityType,
207                                   static_cast<OCQualityOfService>(QoS),
208                                   &cbdata,
209                                   NULL, 0);
210 #else
211             result = OCDoResource(&handle, OC_REST_GET,
212                                   resourceType.c_str(),
213                                   nullptr, nullptr,
214                                   static_cast<OCQualityOfService>(QoS),
215                                   &cbdata,
216                                   NULL, 0);
217 #endif
218         }
219         else
220         {
221             delete context;
222             result = OC_STACK_ERROR;
223         }
224         return result;
225     }
226
227     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
228             OCClientResponse* clientResponse)
229     {
230         ClientCallbackContext::DeviceListenContext* context =
231             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
232
233         OCRepresentation rep = parseGetSetCallback(clientResponse);
234         std::thread exec(context->callback, rep);
235         exec.detach();
236
237         return OC_STACK_KEEP_TRANSACTION;
238     }
239
240 #ifdef CA_INT
241     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
242         const std::string& deviceURI, uint8_t connectivityType,
243         FindDeviceCallback& callback, QualityOfService QoS)
244 #else
245     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
246         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
247 #endif
248     {
249         OCStackResult result;
250
251         OCCallbackData cbdata = {0};
252
253         ClientCallbackContext::DeviceListenContext* context =
254             new ClientCallbackContext::DeviceListenContext();
255         context->callback = callback;
256         context->clientWrapper = shared_from_this();
257
258         cbdata.context =  static_cast<void*>(context);
259         cbdata.cb = listenDeviceCallback;
260         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
261
262         auto cLock = m_csdkLock.lock();
263         if(cLock)
264         {
265             std::lock_guard<std::recursive_mutex> lock(*cLock);
266             OCDoHandle handle;
267 #ifdef CA_INT
268             result = OCDoResource(&handle, OC_REST_GET,
269                                   deviceURI.c_str(),
270                                   nullptr, nullptr, connectivityType,
271                                   static_cast<OCQualityOfService>(QoS),
272                                   &cbdata,
273                                   NULL, 0);
274 #else
275             result = OCDoResource(&handle, OC_REST_GET,
276                                   deviceURI.c_str(),
277                                   nullptr, nullptr,
278                                   static_cast<OCQualityOfService>(QoS),
279                                   &cbdata,
280                                   NULL, 0);
281 #endif
282         }
283         else
284         {
285             result = OC_STACK_ERROR;
286         }
287         return result;
288     }
289
290     void parseServerHeaderOptions(OCClientResponse* clientResponse,
291                     HeaderOptions& serverHeaderOptions)
292     {
293         if(clientResponse)
294         {
295             // Parse header options from server
296             uint16_t optionID;
297             std::string optionData;
298
299             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
300             {
301                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
302                 optionData = reinterpret_cast<const char*>
303                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
304                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
305                 serverHeaderOptions.push_back(headerOption);
306             }
307         }
308         else
309         {
310             // clientResponse is invalid
311             // TODO check proper logging
312             std::cout << " Invalid response " << std::endl;
313         }
314     }
315
316     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
317         OCClientResponse* clientResponse)
318     {
319         ClientCallbackContext::GetContext* context =
320             static_cast<ClientCallbackContext::GetContext*>(ctx);
321
322         OCRepresentation rep;
323         HeaderOptions serverHeaderOptions;
324         if(clientResponse->result == OC_STACK_OK)
325         {
326             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
327             rep = parseGetSetCallback(clientResponse);
328         }
329
330         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
331         exec.detach();
332         return OC_STACK_DELETE_TRANSACTION;
333     }
334
335 #ifdef CA_INT
336     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
337         const std::string& uri, uint8_t connectivityType, const QueryParamsMap& queryParams,
338         const HeaderOptions& headerOptions, GetCallback& callback,
339         QualityOfService QoS)
340 #else
341     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
342         const std::string& uri, const QueryParamsMap& queryParams,
343         const HeaderOptions& headerOptions, GetCallback& callback,
344         QualityOfService QoS)
345 #endif
346     {
347         OCStackResult result;
348         OCCallbackData cbdata = {0};
349
350         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
351         ctx->callback = callback;
352         cbdata.context = static_cast<void*>(ctx);
353         cbdata.cb = &getResourceCallback;
354         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
355
356         auto cLock = m_csdkLock.lock();
357
358         if(cLock)
359         {
360             std::ostringstream os;
361             os << host << assembleSetResourceUri(uri, queryParams).c_str();
362
363             std::lock_guard<std::recursive_mutex> lock(*cLock);
364             OCDoHandle handle;
365             OCHeaderOption options[MAX_HEADER_OPTIONS];
366
367             assembleHeaderOptions(options, headerOptions);
368 #ifdef CA_INT
369             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
370                                   nullptr, nullptr, connectivityType,
371                                   static_cast<OCQualityOfService>(QoS),
372                                   &cbdata,
373                                   options, headerOptions.size());
374 #else
375             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
376                                   nullptr, nullptr,
377                                   static_cast<OCQualityOfService>(QoS),
378                                   &cbdata,
379                                   options, headerOptions.size());
380 #endif
381         }
382         else
383         {
384             delete ctx;
385             result = OC_STACK_ERROR;
386         }
387         return result;
388     }
389
390
391     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
392         OCClientResponse* clientResponse)
393     {
394         ClientCallbackContext::SetContext* context =
395             static_cast<ClientCallbackContext::SetContext*>(ctx);
396         OCRepresentation attrs;
397         HeaderOptions serverHeaderOptions;
398
399         if (OC_STACK_OK               == clientResponse->result ||
400             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
401             OC_STACK_RESOURCE_DELETED == clientResponse->result)
402         {
403             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
404             attrs = parseGetSetCallback(clientResponse);
405         }
406
407         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
408         exec.detach();
409         return OC_STACK_DELETE_TRANSACTION;
410     }
411
412     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
413         const QueryParamsMap& queryParams)
414     {
415         if(uri.back() == '/')
416         {
417             uri.resize(uri.size()-1);
418         }
419
420         ostringstream paramsList;
421         if(queryParams.size() > 0)
422         {
423             paramsList << '?';
424         }
425
426         for(auto& param : queryParams)
427         {
428             paramsList << param.first <<'='<<param.second<<'&';
429         }
430
431         std::string queryString = paramsList.str();
432         if(queryString.back() == '&')
433         {
434             queryString.resize(queryString.size() - 1);
435         }
436
437         std::string ret = uri + queryString;
438         return ret;
439     }
440
441     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
442     {
443         MessageContainer ocInfo;
444         ocInfo.addRepresentation(rep);
445         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
446     }
447
448 #ifdef CA_INT
449     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
450         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
451         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
452         PostCallback& callback, QualityOfService QoS)
453 #else
454     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
455         const std::string& uri, const OCRepresentation& rep,
456         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
457         PostCallback& callback, QualityOfService QoS)
458 #endif
459     {
460         OCStackResult result;
461         OCCallbackData cbdata = {0};
462
463         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
464         ctx->callback = callback;
465         cbdata.cb = &setResourceCallback;
466         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
467         cbdata.context = static_cast<void*>(ctx);
468
469         // TODO: in the future the cstack should be combining these two strings!
470         ostringstream os;
471         os << host << assembleSetResourceUri(uri, queryParams).c_str();
472         // TODO: end of above
473
474         auto cLock = m_csdkLock.lock();
475
476         if(cLock)
477         {
478             std::lock_guard<std::recursive_mutex> lock(*cLock);
479             OCHeaderOption options[MAX_HEADER_OPTIONS];
480             OCDoHandle handle;
481
482             assembleHeaderOptions(options, headerOptions);
483 #ifdef CA_INT
484             result = OCDoResource(&handle, OC_REST_POST,
485                                   os.str().c_str(), nullptr,
486                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
487                                   static_cast<OCQualityOfService>(QoS),
488                                   &cbdata, options, headerOptions.size());
489 #else
490             result = OCDoResource(&handle, OC_REST_POST,
491                                   os.str().c_str(), nullptr,
492                                   assembleSetResourcePayload(rep).c_str(),
493                                   static_cast<OCQualityOfService>(QoS),
494                                   &cbdata, options, headerOptions.size());
495 #endif
496         }
497         else
498         {
499             delete ctx;
500             result = OC_STACK_ERROR;
501         }
502
503         return result;
504     }
505
506 #ifdef CA_INT
507     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
508         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
509         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
510         PutCallback& callback, QualityOfService QoS)
511 #else
512     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
513         const std::string& uri, const OCRepresentation& rep,
514         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
515         PutCallback& callback, QualityOfService QoS)
516 #endif
517     {
518         OCStackResult result;
519         OCCallbackData cbdata = {0};
520
521         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
522         ctx->callback = callback;
523         cbdata.cb = &setResourceCallback;
524         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
525         cbdata.context = static_cast<void*>(ctx);
526
527         // TODO: in the future the cstack should be combining these two strings!
528         ostringstream os;
529         os << host << assembleSetResourceUri(uri, queryParams).c_str();
530         // TODO: end of above
531
532         auto cLock = m_csdkLock.lock();
533
534         if(cLock)
535         {
536             std::lock_guard<std::recursive_mutex> lock(*cLock);
537             OCDoHandle handle;
538             OCHeaderOption options[MAX_HEADER_OPTIONS];
539
540             assembleHeaderOptions(options, headerOptions);
541 #ifdef CA_INT
542             result = OCDoResource(&handle, OC_REST_PUT,
543                                   os.str().c_str(), nullptr,
544                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
545                                   static_cast<OCQualityOfService>(QoS),
546                                   &cbdata,
547                                   options, headerOptions.size());
548 #else
549             result = OCDoResource(&handle, OC_REST_PUT,
550                                   os.str().c_str(), nullptr,
551                                   assembleSetResourcePayload(rep).c_str(),
552                                   static_cast<OCQualityOfService>(QoS),
553                                   &cbdata,
554                                   options, headerOptions.size());
555 #endif
556         }
557         else
558         {
559             delete ctx;
560             result = OC_STACK_ERROR;
561         }
562
563         return result;
564     }
565
566     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
567         OCClientResponse* clientResponse)
568     {
569         ClientCallbackContext::DeleteContext* context =
570             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
571         HeaderOptions serverHeaderOptions;
572
573         if(clientResponse->result == OC_STACK_OK)
574         {
575             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
576         }
577         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
578         exec.detach();
579         return OC_STACK_DELETE_TRANSACTION;
580     }
581
582 #ifdef CA_INT
583     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
584         const std::string& uri, uint8_t connectivityType, const HeaderOptions& headerOptions,
585          DeleteCallback& callback, QualityOfService QoS)
586 #else
587     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
588         const std::string& uri, const HeaderOptions& headerOptions,
589          DeleteCallback& callback, QualityOfService QoS)
590 #endif
591     {
592         OCStackResult result;
593         OCCallbackData cbdata = {0};
594
595         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
596         ctx->callback = callback;
597         cbdata.cb = &deleteResourceCallback;
598         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
599         cbdata.context = static_cast<void*>(ctx);
600
601         ostringstream os;
602         os << host << uri;
603
604         auto cLock = m_csdkLock.lock();
605
606         if(cLock)
607         {
608             OCHeaderOption options[MAX_HEADER_OPTIONS];
609             OCDoHandle handle;
610
611             assembleHeaderOptions(options, headerOptions);
612
613             std::lock_guard<std::recursive_mutex> lock(*cLock);
614 #ifdef CA_INT
615             result = OCDoResource(&handle, OC_REST_DELETE,
616                                   os.str().c_str(), nullptr,
617                                   nullptr, connectivityType,
618                                   static_cast<OCQualityOfService>(m_cfg.QoS),
619                                   &cbdata, options, headerOptions.size());
620 #else
621             result = OCDoResource(&handle, OC_REST_DELETE,
622                                   os.str().c_str(), nullptr,
623                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
624                                   &cbdata, options, headerOptions.size());
625 #endif
626         }
627         else
628         {
629             delete ctx;
630             result = OC_STACK_ERROR;
631         }
632
633         return result;
634     }
635
636     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
637         OCClientResponse* clientResponse)
638     {
639         ClientCallbackContext::ObserveContext* context =
640             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
641         OCRepresentation attrs;
642         HeaderOptions serverHeaderOptions;
643         uint32_t sequenceNumber = clientResponse->sequenceNumber;
644
645         if(clientResponse->result == OC_STACK_OK)
646         {
647             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
648             attrs = parseGetSetCallback(clientResponse);
649         }
650         std::thread exec(context->callback, serverHeaderOptions, attrs,
651                     clientResponse->result, sequenceNumber);
652         exec.detach();
653         return OC_STACK_KEEP_TRANSACTION;
654     }
655
656 #ifdef CA_INT
657     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
658         const std::string& host, const std::string& uri, uint8_t connectivityType,
659         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
660         ObserveCallback& callback, QualityOfService QoS)
661 #else
662     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
663         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
664         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
665 #endif
666     {
667         OCStackResult result;
668         OCCallbackData cbdata = {0};
669
670         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
671         ctx->callback = callback;
672         cbdata.context = static_cast<void*>(ctx);
673         cbdata.cb = &observeResourceCallback;
674         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
675
676         OCMethod method;
677         if (observeType == ObserveType::Observe)
678         {
679             method = OC_REST_OBSERVE;
680         }
681         else if (observeType == ObserveType::ObserveAll)
682         {
683             method = OC_REST_OBSERVE_ALL;
684         }
685         else
686         {
687             method = OC_REST_OBSERVE_ALL;
688         }
689
690         auto cLock = m_csdkLock.lock();
691
692         if(cLock)
693         {
694             std::ostringstream os;
695             os << host << assembleSetResourceUri(uri, queryParams).c_str();
696
697             std::lock_guard<std::recursive_mutex> lock(*cLock);
698             OCHeaderOption options[MAX_HEADER_OPTIONS];
699
700             assembleHeaderOptions(options, headerOptions);
701 #ifdef CA_INT
702             result = OCDoResource(handle, method,
703                                   os.str().c_str(), nullptr,
704                                   nullptr, connectivityType,
705                                   static_cast<OCQualityOfService>(QoS),
706                                   &cbdata,
707                                   options, headerOptions.size());
708 #else
709             result = OCDoResource(handle, method,
710                                   os.str().c_str(), nullptr,
711                                   nullptr,
712                                   static_cast<OCQualityOfService>(QoS),
713                                   &cbdata,
714                                   options, headerOptions.size());
715 #endif
716         }
717         else
718         {
719             delete ctx;
720             return OC_STACK_ERROR;
721         }
722
723         return result;
724     }
725
726     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
727         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
728         QualityOfService QoS)
729     {
730         OCStackResult result;
731         auto cLock = m_csdkLock.lock();
732
733         if(cLock)
734         {
735             std::lock_guard<std::recursive_mutex> lock(*cLock);
736             OCHeaderOption options[MAX_HEADER_OPTIONS];
737
738             assembleHeaderOptions(options, headerOptions);
739             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
740                     headerOptions.size());
741         }
742         else
743         {
744             result = OC_STACK_ERROR;
745         }
746
747         return result;
748     }
749
750     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
751             OCClientResponse* clientResponse)
752     {
753         char stringAddress[DEV_ADDR_SIZE_MAX];
754         ostringstream os;
755         uint16_t port;
756
757         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
758                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
759         {
760             os<<stringAddress<<":"<<port;
761
762             ClientCallbackContext::SubscribePresenceContext* context =
763                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
764
765             std::thread exec(context->callback, clientResponse->result,
766                     clientResponse->sequenceNumber, os.str());
767
768             exec.detach();
769         }
770         else
771         {
772             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
773                     <<"failed"<< std::flush;
774         }
775         return OC_STACK_KEEP_TRANSACTION;
776     }
777
778 #ifdef CA_INT
779     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
780         const std::string& host, const std::string& resourceType, uint8_t connectivityType,
781         SubscribeCallback& presenceHandler)
782 #else
783     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
784         const std::string& host, const std::string& resourceType,
785         SubscribeCallback& presenceHandler)
786 #endif
787     {
788         OCCallbackData cbdata = {0};
789
790         ClientCallbackContext::SubscribePresenceContext* ctx =
791             new ClientCallbackContext::SubscribePresenceContext();
792         ctx->callback = presenceHandler;
793         cbdata.cb = &subscribePresenceCallback;
794         cbdata.context = static_cast<void*>(ctx);
795         cbdata.cd = [](void* c)
796             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
797         auto cLock = m_csdkLock.lock();
798
799         std::ostringstream os;
800         os << host << "/oc/presence";
801
802         if(!resourceType.empty())
803         {
804             os << "?rt=" << resourceType;
805         }
806
807         if(!cLock)
808         {
809             delete ctx;
810             return OC_STACK_ERROR;
811         }
812
813 #ifdef CA_INT
814         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
815                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
816 #else
817         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
818                             OC_LOW_QOS, &cbdata, NULL, 0);
819 #endif
820     }
821
822     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
823     {
824         OCStackResult result;
825         auto cLock = m_csdkLock.lock();
826
827         if(cLock)
828         {
829             std::lock_guard<std::recursive_mutex> lock(*cLock);
830             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
831         }
832         else
833         {
834             result = OC_STACK_ERROR;
835         }
836
837         return result;
838     }
839
840     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
841     {
842         qos = m_cfg.QoS;
843         return OC_STACK_OK;
844     }
845
846     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
847            const HeaderOptions& headerOptions)
848     {
849         int i = 0;
850
851         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
852         {
853             options[i].protocolID = OC_COAP_ID;
854             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
855             options[i].optionLength = (it->getOptionData()).length() + 1;
856             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
857                     (it->getOptionData()).length() + 1);
858             i++;
859         }
860     }
861 }