[CA-Integration] Added connectivity type param in APIs
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             throw OCException(OC::Exception::STR_NULL_RESPONSE, OC_STACK_ERROR);
100         }
101
102         MessageContainer oc;
103         oc.setJSONRepresentation(clientResponse->resJSONPayload);
104
105         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
106         if(it == oc.representations().end())
107         {
108             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_ERROR);
109         }
110
111         // first one is considered the root, everything else is considered a child of this one.
112         OCRepresentation root = *it;
113         ++it;
114
115         std::for_each(it, oc.representations().end(),
116                 [&root](const OCRepresentation& repItr)
117                 {root.addChild(repItr);});
118         return root;
119
120     }
121
122     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
123         OCClientResponse* clientResponse)
124     {
125         ClientCallbackContext::ListenContext* context =
126             static_cast<ClientCallbackContext::ListenContext*>(ctx);
127
128         if(clientResponse->result != OC_STACK_OK)
129         {
130             oclog() << "listenCallback(): failed to create resource. clientResponse: "
131                     << clientResponse->result
132                     << std::flush;
133
134             return OC_STACK_KEEP_TRANSACTION;
135         }
136
137         auto clientWrapper = context->clientWrapper.lock();
138
139         if(!clientWrapper)
140         {
141             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
142                     << std::flush;
143             return OC_STACK_KEEP_TRANSACTION;
144         }
145
146         std::stringstream requestStream;
147         requestStream << clientResponse->resJSONPayload;
148
149         try
150         {
151             ListenOCContainer container(clientWrapper, *clientResponse->addr,
152                     requestStream);
153
154             // loop to ensure valid construction of all resources
155             for(auto resource : container.Resources())
156             {
157                 std::thread exec(context->callback, resource);
158                 exec.detach();
159             }
160
161         }
162         catch(const std::exception& e)
163         {
164             oclog() << "listenCallback failed to parse a malformed message: "
165                     << e.what()
166                     << std::endl <<std::endl
167                     << clientResponse->result
168                     << std::flush;
169             return OC_STACK_KEEP_TRANSACTION;
170         }
171
172         return OC_STACK_KEEP_TRANSACTION;
173     }
174
175 #ifdef CA_INT
176     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
177         const std::string& resourceType, uint8_t connectivityType,
178         FindCallback& callback, QualityOfService QoS)
179 #else
180     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
181         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
182 #endif
183     {
184         OCStackResult result;
185
186         OCCallbackData cbdata = {0};
187
188         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
189         context->callback = callback;
190         context->clientWrapper = shared_from_this();
191
192         cbdata.context =  static_cast<void*>(context);
193         cbdata.cb = listenCallback;
194         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
195
196         auto cLock = m_csdkLock.lock();
197         if(cLock)
198         {
199             std::lock_guard<std::recursive_mutex> lock(*cLock);
200             OCDoHandle handle;
201 #ifdef CA_INT
202             result = OCDoResource(&handle, OC_REST_GET,
203                                   resourceType.c_str(),
204                                   nullptr, nullptr, connectivityType,
205                                   static_cast<OCQualityOfService>(QoS),
206                                   &cbdata,
207                                   NULL, 0);
208 #else
209             result = OCDoResource(&handle, OC_REST_GET,
210                                   resourceType.c_str(),
211                                   nullptr, nullptr,
212                                   static_cast<OCQualityOfService>(QoS),
213                                   &cbdata,
214                                   NULL, 0);
215 #endif
216         }
217         else
218         {
219             delete context;
220             result = OC_STACK_ERROR;
221         }
222         return result;
223     }
224
225     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
226             OCClientResponse* clientResponse)
227     {
228         ClientCallbackContext::DeviceListenContext* context =
229             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
230
231         OCRepresentation rep = parseGetSetCallback(clientResponse);
232         std::thread exec(context->callback, rep);
233         exec.detach();
234
235         return OC_STACK_KEEP_TRANSACTION;
236     }
237
238 #ifdef CA_INT
239     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
240         const std::string& deviceURI, uint8_t connectivityType,
241         FindDeviceCallback& callback, QualityOfService QoS)
242 #else
243     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
244         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
245 #endif
246     {
247         OCStackResult result;
248
249         OCCallbackData cbdata = {0};
250
251         ClientCallbackContext::DeviceListenContext* context =
252             new ClientCallbackContext::DeviceListenContext();
253         context->callback = callback;
254         context->clientWrapper = shared_from_this();
255
256         cbdata.context =  static_cast<void*>(context);
257         cbdata.cb = listenDeviceCallback;
258         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
259
260         auto cLock = m_csdkLock.lock();
261         if(cLock)
262         {
263             std::lock_guard<std::recursive_mutex> lock(*cLock);
264             OCDoHandle handle;
265 #ifdef CA_INT
266             result = OCDoResource(&handle, OC_REST_GET,
267                                   deviceURI.c_str(),
268                                   nullptr, nullptr, connectivityType,
269                                   static_cast<OCQualityOfService>(QoS),
270                                   &cbdata,
271                                   NULL, 0);
272 #else
273             result = OCDoResource(&handle, OC_REST_GET,
274                                   deviceURI.c_str(),
275                                   nullptr, nullptr,
276                                   static_cast<OCQualityOfService>(QoS),
277                                   &cbdata,
278                                   NULL, 0);
279 #endif
280         }
281         else
282         {
283             result = OC_STACK_ERROR;
284         }
285         return result;
286     }
287
288     void parseServerHeaderOptions(OCClientResponse* clientResponse,
289                     HeaderOptions& serverHeaderOptions)
290     {
291         if(clientResponse)
292         {
293             // Parse header options from server
294             uint16_t optionID;
295             std::string optionData;
296
297             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
298             {
299                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
300                 optionData = reinterpret_cast<const char*>
301                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
302                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
303                 serverHeaderOptions.push_back(headerOption);
304             }
305         }
306         else
307         {
308             // clientResponse is invalid
309             // TODO check proper logging
310             std::cout << " Invalid response " << std::endl;
311         }
312     }
313
314     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
315         OCClientResponse* clientResponse)
316     {
317         ClientCallbackContext::GetContext* context =
318             static_cast<ClientCallbackContext::GetContext*>(ctx);
319
320         OCRepresentation rep;
321         HeaderOptions serverHeaderOptions;
322         if(clientResponse->result == OC_STACK_OK)
323         {
324             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
325             rep = parseGetSetCallback(clientResponse);
326         }
327
328         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
329         exec.detach();
330         return OC_STACK_DELETE_TRANSACTION;
331     }
332
333 #ifdef CA_INT
334     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
335         const std::string& uri, uint8_t connectivityType, const QueryParamsMap& queryParams,
336         const HeaderOptions& headerOptions, GetCallback& callback,
337         QualityOfService QoS)
338 #else
339     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
340         const std::string& uri, const QueryParamsMap& queryParams,
341         const HeaderOptions& headerOptions, GetCallback& callback,
342         QualityOfService QoS)
343 #endif
344     {
345         OCStackResult result;
346         OCCallbackData cbdata = {0};
347
348         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
349         ctx->callback = callback;
350         cbdata.context = static_cast<void*>(ctx);
351         cbdata.cb = &getResourceCallback;
352         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
353
354         auto cLock = m_csdkLock.lock();
355
356         if(cLock)
357         {
358             std::ostringstream os;
359             os << host << assembleSetResourceUri(uri, queryParams).c_str();
360
361             std::lock_guard<std::recursive_mutex> lock(*cLock);
362             OCDoHandle handle;
363             OCHeaderOption options[MAX_HEADER_OPTIONS];
364
365             assembleHeaderOptions(options, headerOptions);
366 #ifdef CA_INT
367             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
368                                   nullptr, nullptr, connectivityType,
369                                   static_cast<OCQualityOfService>(QoS),
370                                   &cbdata,
371                                   options, headerOptions.size());
372 #else
373             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
374                                   nullptr, nullptr,
375                                   static_cast<OCQualityOfService>(QoS),
376                                   &cbdata,
377                                   options, headerOptions.size());
378 #endif
379         }
380         else
381         {
382             delete ctx;
383             result = OC_STACK_ERROR;
384         }
385         return result;
386     }
387
388
389     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
390         OCClientResponse* clientResponse)
391     {
392         ClientCallbackContext::SetContext* context =
393             static_cast<ClientCallbackContext::SetContext*>(ctx);
394         OCRepresentation attrs;
395         HeaderOptions serverHeaderOptions;
396
397         if (OC_STACK_OK               == clientResponse->result ||
398             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
399             OC_STACK_RESOURCE_DELETED == clientResponse->result)
400         {
401             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
402             attrs = parseGetSetCallback(clientResponse);
403         }
404
405         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
406         exec.detach();
407         return OC_STACK_DELETE_TRANSACTION;
408     }
409
410     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
411         const QueryParamsMap& queryParams)
412     {
413         if(uri.back() == '/')
414         {
415             uri.resize(uri.size()-1);
416         }
417
418         ostringstream paramsList;
419         if(queryParams.size() > 0)
420         {
421             paramsList << '?';
422         }
423
424         for(auto& param : queryParams)
425         {
426             paramsList << param.first <<'='<<param.second<<'&';
427         }
428
429         std::string queryString = paramsList.str();
430         if(queryString.back() == '&')
431         {
432             queryString.resize(queryString.size() - 1);
433         }
434
435         std::string ret = uri + queryString;
436         return ret;
437     }
438
439     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
440     {
441         MessageContainer ocInfo;
442         ocInfo.addRepresentation(rep);
443         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
444     }
445
446 #ifdef CA_INT
447     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
448         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
449         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
450         PostCallback& callback, QualityOfService QoS)
451 #else
452     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
453         const std::string& uri, const OCRepresentation& rep,
454         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
455         PostCallback& callback, QualityOfService QoS)
456 #endif
457     {
458         OCStackResult result;
459         OCCallbackData cbdata = {0};
460
461         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
462         ctx->callback = callback;
463         cbdata.cb = &setResourceCallback;
464         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
465         cbdata.context = static_cast<void*>(ctx);
466
467         // TODO: in the future the cstack should be combining these two strings!
468         ostringstream os;
469         os << host << assembleSetResourceUri(uri, queryParams).c_str();
470         // TODO: end of above
471
472         auto cLock = m_csdkLock.lock();
473
474         if(cLock)
475         {
476             std::lock_guard<std::recursive_mutex> lock(*cLock);
477             OCHeaderOption options[MAX_HEADER_OPTIONS];
478             OCDoHandle handle;
479
480             assembleHeaderOptions(options, headerOptions);
481 #ifdef CA_INT
482             result = OCDoResource(&handle, OC_REST_POST,
483                                   os.str().c_str(), nullptr,
484                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
485                                   static_cast<OCQualityOfService>(QoS),
486                                   &cbdata, options, headerOptions.size());
487 #else
488             result = OCDoResource(&handle, OC_REST_POST,
489                                   os.str().c_str(), nullptr,
490                                   assembleSetResourcePayload(rep).c_str(),
491                                   static_cast<OCQualityOfService>(QoS),
492                                   &cbdata, options, headerOptions.size());
493 #endif
494         }
495         else
496         {
497             delete ctx;
498             result = OC_STACK_ERROR;
499         }
500
501         return result;
502     }
503
504 #ifdef CA_INT
505     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
506         const std::string& uri, uint8_t connectivityType, const OCRepresentation& rep,
507         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
508         PutCallback& callback, QualityOfService QoS)
509 #else
510     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
511         const std::string& uri, const OCRepresentation& rep,
512         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
513         PutCallback& callback, QualityOfService QoS)
514 #endif
515     {
516         OCStackResult result;
517         OCCallbackData cbdata = {0};
518
519         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
520         ctx->callback = callback;
521         cbdata.cb = &setResourceCallback;
522         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
523         cbdata.context = static_cast<void*>(ctx);
524
525         // TODO: in the future the cstack should be combining these two strings!
526         ostringstream os;
527         os << host << assembleSetResourceUri(uri, queryParams).c_str();
528         // TODO: end of above
529
530         auto cLock = m_csdkLock.lock();
531
532         if(cLock)
533         {
534             std::lock_guard<std::recursive_mutex> lock(*cLock);
535             OCDoHandle handle;
536             OCHeaderOption options[MAX_HEADER_OPTIONS];
537
538             assembleHeaderOptions(options, headerOptions);
539 #ifdef CA_INT
540             result = OCDoResource(&handle, OC_REST_PUT,
541                                   os.str().c_str(), nullptr,
542                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
543                                   static_cast<OCQualityOfService>(QoS),
544                                   &cbdata,
545                                   options, headerOptions.size());
546 #else
547             result = OCDoResource(&handle, OC_REST_PUT,
548                                   os.str().c_str(), nullptr,
549                                   assembleSetResourcePayload(rep).c_str(),
550                                   static_cast<OCQualityOfService>(QoS),
551                                   &cbdata,
552                                   options, headerOptions.size());
553 #endif
554         }
555         else
556         {
557             delete ctx;
558             result = OC_STACK_ERROR;
559         }
560
561         return result;
562     }
563
564     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
565         OCClientResponse* clientResponse)
566     {
567         ClientCallbackContext::DeleteContext* context =
568             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
569         HeaderOptions serverHeaderOptions;
570
571         if(clientResponse->result == OC_STACK_OK)
572         {
573             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
574         }
575         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
576         exec.detach();
577         return OC_STACK_DELETE_TRANSACTION;
578     }
579
580 #ifdef CA_INT
581     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
582         const std::string& uri, uint8_t connectivityType, const HeaderOptions& headerOptions,
583          DeleteCallback& callback, QualityOfService QoS)
584 #else
585     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
586         const std::string& uri, const HeaderOptions& headerOptions,
587          DeleteCallback& callback, QualityOfService QoS)
588 #endif
589     {
590         OCStackResult result;
591         OCCallbackData cbdata = {0};
592
593         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
594         ctx->callback = callback;
595         cbdata.cb = &deleteResourceCallback;
596         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
597         cbdata.context = static_cast<void*>(ctx);
598
599         ostringstream os;
600         os << host << uri;
601
602         auto cLock = m_csdkLock.lock();
603
604         if(cLock)
605         {
606             OCHeaderOption options[MAX_HEADER_OPTIONS];
607             OCDoHandle handle;
608
609             assembleHeaderOptions(options, headerOptions);
610
611             std::lock_guard<std::recursive_mutex> lock(*cLock);
612 #ifdef CA_INT
613             result = OCDoResource(&handle, OC_REST_DELETE,
614                                   os.str().c_str(), nullptr,
615                                   nullptr, connectivityType,
616                                   static_cast<OCQualityOfService>(m_cfg.QoS),
617                                   &cbdata, options, headerOptions.size());
618 #else
619             result = OCDoResource(&handle, OC_REST_DELETE,
620                                   os.str().c_str(), nullptr,
621                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
622                                   &cbdata, options, headerOptions.size());
623 #endif
624         }
625         else
626         {
627             delete ctx;
628             result = OC_STACK_ERROR;
629         }
630
631         return result;
632     }
633
634     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
635         OCClientResponse* clientResponse)
636     {
637         ClientCallbackContext::ObserveContext* context =
638             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
639         OCRepresentation attrs;
640         HeaderOptions serverHeaderOptions;
641         uint32_t sequenceNumber = clientResponse->sequenceNumber;
642
643         if(clientResponse->result == OC_STACK_OK)
644         {
645             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
646             attrs = parseGetSetCallback(clientResponse);
647         }
648         std::thread exec(context->callback, serverHeaderOptions, attrs,
649                     clientResponse->result, sequenceNumber);
650         exec.detach();
651         return OC_STACK_KEEP_TRANSACTION;
652     }
653
654 #ifdef CA_INT
655     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
656         const std::string& host, const std::string& uri, uint8_t connectivityType,
657         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
658         ObserveCallback& callback, QualityOfService QoS)
659 #else
660     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
661         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
662         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
663 #endif
664     {
665         OCStackResult result;
666         OCCallbackData cbdata = {0};
667
668         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
669         ctx->callback = callback;
670         cbdata.context = static_cast<void*>(ctx);
671         cbdata.cb = &observeResourceCallback;
672         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
673
674         OCMethod method;
675         if (observeType == ObserveType::Observe)
676         {
677             method = OC_REST_OBSERVE;
678         }
679         else if (observeType == ObserveType::ObserveAll)
680         {
681             method = OC_REST_OBSERVE_ALL;
682         }
683         else
684         {
685             method = OC_REST_OBSERVE_ALL;
686         }
687
688         auto cLock = m_csdkLock.lock();
689
690         if(cLock)
691         {
692             std::ostringstream os;
693             os << host << assembleSetResourceUri(uri, queryParams).c_str();
694
695             std::lock_guard<std::recursive_mutex> lock(*cLock);
696             OCHeaderOption options[MAX_HEADER_OPTIONS];
697
698             assembleHeaderOptions(options, headerOptions);
699 #ifdef CA_INT
700             result = OCDoResource(handle, method,
701                                   os.str().c_str(), nullptr,
702                                   nullptr, connectivityType,
703                                   static_cast<OCQualityOfService>(QoS),
704                                   &cbdata,
705                                   options, headerOptions.size());
706 #else
707             result = OCDoResource(handle, method,
708                                   os.str().c_str(), nullptr,
709                                   nullptr,
710                                   static_cast<OCQualityOfService>(QoS),
711                                   &cbdata,
712                                   options, headerOptions.size());
713 #endif
714         }
715         else
716         {
717             delete ctx;
718             return OC_STACK_ERROR;
719         }
720
721         return result;
722     }
723
724     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
725         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
726         QualityOfService QoS)
727     {
728         OCStackResult result;
729         auto cLock = m_csdkLock.lock();
730
731         if(cLock)
732         {
733             std::lock_guard<std::recursive_mutex> lock(*cLock);
734             OCHeaderOption options[MAX_HEADER_OPTIONS];
735
736             assembleHeaderOptions(options, headerOptions);
737             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
738                     headerOptions.size());
739         }
740         else
741         {
742             result = OC_STACK_ERROR;
743         }
744
745         return result;
746     }
747
748     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
749             OCClientResponse* clientResponse)
750     {
751         char stringAddress[DEV_ADDR_SIZE_MAX];
752         ostringstream os;
753         uint16_t port;
754
755         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
756                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
757         {
758             os<<stringAddress<<":"<<port;
759
760             ClientCallbackContext::SubscribePresenceContext* context =
761                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
762
763             std::thread exec(context->callback, clientResponse->result,
764                     clientResponse->sequenceNumber, os.str());
765
766             exec.detach();
767         }
768         else
769         {
770             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
771                     <<"failed"<< std::flush;
772         }
773         return OC_STACK_KEEP_TRANSACTION;
774     }
775
776 #ifdef CA_INT
777     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
778         const std::string& host, const std::string& resourceType, uint8_t connectivityType,
779         SubscribeCallback& presenceHandler)
780 #else
781     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
782         const std::string& host, const std::string& resourceType,
783         SubscribeCallback& presenceHandler)
784 #endif
785     {
786         OCCallbackData cbdata = {0};
787
788         ClientCallbackContext::SubscribePresenceContext* ctx =
789             new ClientCallbackContext::SubscribePresenceContext();
790         ctx->callback = presenceHandler;
791         cbdata.cb = &subscribePresenceCallback;
792         cbdata.context = static_cast<void*>(ctx);
793         cbdata.cd = [](void* c)
794             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
795         auto cLock = m_csdkLock.lock();
796
797         std::ostringstream os;
798         os << host << "/oc/presence";
799
800         if(!resourceType.empty())
801         {
802             os << "?rt=" << resourceType;
803         }
804
805         if(!cLock)
806         {
807             delete ctx;
808             return OC_STACK_ERROR;
809         }
810
811 #ifdef CA_INT
812         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
813                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
814 #else
815         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
816                             OC_LOW_QOS, &cbdata, NULL, 0);
817 #endif
818     }
819
820     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
821     {
822         OCStackResult result;
823         auto cLock = m_csdkLock.lock();
824
825         if(cLock)
826         {
827             std::lock_guard<std::recursive_mutex> lock(*cLock);
828             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
829         }
830         else
831         {
832             result = OC_STACK_ERROR;
833         }
834
835         return result;
836     }
837
838     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
839     {
840         qos = m_cfg.QoS;
841         return OC_STACK_OK;
842     }
843
844     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
845            const HeaderOptions& headerOptions)
846     {
847         int i = 0;
848
849         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
850         {
851             options[i].protocolID = OC_COAP_ID;
852             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
853             options[i].optionLength = (it->getOptionData()).length() + 1;
854             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
855                     (it->getOptionData()).length() + 1);
856             i++;
857         }
858     }
859 }