Resolve Klocwork c++ issues
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             return OCRepresentation();
100         }
101
102         MessageContainer oc;
103         try
104         {
105             oc.setJSONRepresentation(clientResponse->resJSONPayload);
106         }
107         catch (cereal::RapidJSONException& ex)
108         {
109             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
112         }
113         catch (cereal::Exception& ex)
114         {
115             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
118         }
119
120         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121         if(it == oc.representations().end())
122         {
123             return OCRepresentation();
124         }
125
126         // first one is considered the root, everything else is considered a child of this one.
127         OCRepresentation root = *it;
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if(clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         auto clientWrapper = context->clientWrapper.lock();
153
154         if(!clientWrapper)
155         {
156             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
157                     << std::flush;
158             return OC_STACK_KEEP_TRANSACTION;
159         }
160
161         std::stringstream requestStream;
162         requestStream << clientResponse->resJSONPayload;
163
164         try
165         {
166
167 #ifdef CA_INT
168             ListenOCContainer container(clientWrapper, *clientResponse->addr,
169                     clientResponse->connType, requestStream);
170 #else
171             ListenOCContainer container(clientWrapper, *clientResponse->addr,
172                     requestStream);
173 #endif
174             // loop to ensure valid construction of all resources
175             for(auto resource : container.Resources())
176             {
177                 std::thread exec(context->callback, resource);
178                 exec.detach();
179             }
180
181         }
182         catch(const std::exception& e)
183         {
184             oclog() << "listenCallback failed to parse a malformed message: "
185                     << e.what()
186                     << std::endl
187                     << clientResponse->resJSONPayload
188                     << std::endl
189                     << clientResponse->result
190                     << std::flush;
191             return OC_STACK_KEEP_TRANSACTION;
192         }
193
194         return OC_STACK_KEEP_TRANSACTION;
195     }
196
197 #ifdef CA_INT
198     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
199         const std::string& resourceType, OCConnectivityType connectivityType,
200         FindCallback& callback, QualityOfService QoS)
201 #else
202     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
203         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
204 #endif
205     {
206         OCStackResult result;
207
208         OCCallbackData cbdata = {0};
209
210         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
211         context->callback = callback;
212         context->clientWrapper = shared_from_this();
213
214         cbdata.context =  static_cast<void*>(context);
215         cbdata.cb = listenCallback;
216         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
217
218         auto cLock = m_csdkLock.lock();
219         if(cLock)
220         {
221             std::lock_guard<std::recursive_mutex> lock(*cLock);
222             OCDoHandle handle;
223 #ifdef CA_INT
224             result = OCDoResource(&handle, OC_REST_GET,
225                                   resourceType.c_str(),
226                                   nullptr, nullptr, connectivityType,
227                                   static_cast<OCQualityOfService>(QoS),
228                                   &cbdata,
229                                   NULL, 0);
230 #else
231             result = OCDoResource(&handle, OC_REST_GET,
232                                   resourceType.c_str(),
233                                   nullptr, nullptr,
234                                   static_cast<OCQualityOfService>(QoS),
235                                   &cbdata,
236                                   NULL, 0);
237 #endif
238         }
239         else
240         {
241             delete context;
242             result = OC_STACK_ERROR;
243         }
244         return result;
245     }
246
247     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
248             OCClientResponse* clientResponse)
249     {
250         ClientCallbackContext::DeviceListenContext* context =
251             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
252
253         try
254         {
255             OCRepresentation rep = parseGetSetCallback(clientResponse);
256             std::thread exec(context->callback, rep);
257             exec.detach();
258         }
259         catch(OC::OCException& e)
260         {
261             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
262                 <<e.what() <<std::flush;
263         }
264
265         return OC_STACK_KEEP_TRANSACTION;
266     }
267
268 #ifdef CA_INT
269     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
270         const std::string& deviceURI, OCConnectivityType connectivityType,
271         FindDeviceCallback& callback, QualityOfService QoS)
272 #else
273     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
274         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
275 #endif
276     {
277         OCStackResult result;
278
279         OCCallbackData cbdata = {0};
280         ClientCallbackContext::DeviceListenContext* context =
281             new ClientCallbackContext::DeviceListenContext();
282         context->callback = callback;
283         context->clientWrapper = shared_from_this();
284         cbdata.context =  static_cast<void*>(context);
285         cbdata.cb = listenDeviceCallback;
286         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
287
288         auto cLock = m_csdkLock.lock();
289         if(cLock)
290         {
291             std::lock_guard<std::recursive_mutex> lock(*cLock);
292             OCDoHandle handle;
293 #ifdef CA_INT
294             result = OCDoResource(&handle, OC_REST_GET,
295                                   deviceURI.c_str(),
296                                   nullptr, nullptr, connectivityType,
297                                   static_cast<OCQualityOfService>(QoS),
298                                   &cbdata,
299                                   NULL, 0);
300 #else
301             result = OCDoResource(&handle, OC_REST_GET,
302                                   deviceURI.c_str(),
303                                   nullptr, nullptr,
304                                   static_cast<OCQualityOfService>(QoS),
305                                   &cbdata,
306                                   NULL, 0);
307 #endif
308         }
309         else
310         {
311             delete context;
312             result = OC_STACK_ERROR;
313         }
314         return result;
315     }
316
317     void parseServerHeaderOptions(OCClientResponse* clientResponse,
318                     HeaderOptions& serverHeaderOptions)
319     {
320         if(clientResponse)
321         {
322             // Parse header options from server
323             uint16_t optionID;
324             std::string optionData;
325
326             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
327             {
328                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
329                 optionData = reinterpret_cast<const char*>
330                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
331                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
332                 serverHeaderOptions.push_back(headerOption);
333             }
334         }
335         else
336         {
337             // clientResponse is invalid
338             // TODO check proper logging
339             std::cout << " Invalid response " << std::endl;
340         }
341     }
342
343     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
344         OCClientResponse* clientResponse)
345     {
346         ClientCallbackContext::GetContext* context =
347             static_cast<ClientCallbackContext::GetContext*>(ctx);
348
349         OCRepresentation rep;
350         HeaderOptions serverHeaderOptions;
351         OCStackResult result = clientResponse->result;
352         if(result == OC_STACK_OK)
353         {
354             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
355             try
356             {
357                 rep = parseGetSetCallback(clientResponse);
358             }
359             catch(OC::OCException& e)
360             {
361                 result = e.code();
362             }
363         }
364
365         std::thread exec(context->callback, serverHeaderOptions, rep, result);
366         exec.detach();
367         return OC_STACK_DELETE_TRANSACTION;
368     }
369
370 #ifdef CA_INT
371     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
372         const std::string& uri, OCConnectivityType connectivityType,
373         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
374         GetCallback& callback, QualityOfService QoS)
375 #else
376     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
377         const std::string& uri, const QueryParamsMap& queryParams,
378         const HeaderOptions& headerOptions, GetCallback& callback,
379         QualityOfService QoS)
380 #endif
381     {
382         OCStackResult result;
383         OCCallbackData cbdata = {0};
384
385         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
386         ctx->callback = callback;
387         cbdata.context = static_cast<void*>(ctx);
388         cbdata.cb = &getResourceCallback;
389         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
390
391         auto cLock = m_csdkLock.lock();
392
393         if(cLock)
394         {
395             std::ostringstream os;
396             os << host << assembleSetResourceUri(uri, queryParams).c_str();
397
398             std::lock_guard<std::recursive_mutex> lock(*cLock);
399             OCDoHandle handle;
400             OCHeaderOption options[MAX_HEADER_OPTIONS];
401
402             assembleHeaderOptions(options, headerOptions);
403 #ifdef CA_INT
404             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
405                                   nullptr, nullptr, connectivityType,
406                                   static_cast<OCQualityOfService>(QoS),
407                                   &cbdata,
408                                   options, headerOptions.size());
409 #else
410             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
411                                   nullptr, nullptr,
412                                   static_cast<OCQualityOfService>(QoS),
413                                   &cbdata,
414                                   options, headerOptions.size());
415 #endif
416         }
417         else
418         {
419             delete ctx;
420             result = OC_STACK_ERROR;
421         }
422         return result;
423     }
424
425
426     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
427         OCClientResponse* clientResponse)
428     {
429         ClientCallbackContext::SetContext* context =
430             static_cast<ClientCallbackContext::SetContext*>(ctx);
431         OCRepresentation attrs;
432         HeaderOptions serverHeaderOptions;
433
434         OCStackResult result = clientResponse->result;
435         if (OC_STACK_OK               == result ||
436             OC_STACK_RESOURCE_CREATED == result ||
437             OC_STACK_RESOURCE_DELETED == result)
438         {
439             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
440             try
441             {
442                 attrs = parseGetSetCallback(clientResponse);
443             }
444             catch(OC::OCException& e)
445             {
446                 result = e.code();
447             }
448         }
449
450         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
451         exec.detach();
452         return OC_STACK_DELETE_TRANSACTION;
453     }
454
455     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
456         const QueryParamsMap& queryParams)
457     {
458         if(uri.back() == '/')
459         {
460             uri.resize(uri.size()-1);
461         }
462
463         ostringstream paramsList;
464         if(queryParams.size() > 0)
465         {
466             paramsList << '?';
467         }
468
469         for(auto& param : queryParams)
470         {
471             paramsList << param.first <<'='<<param.second<<'&';
472         }
473
474         std::string queryString = paramsList.str();
475         if(queryString.back() == '&')
476         {
477             queryString.resize(queryString.size() - 1);
478         }
479
480         std::string ret = uri + queryString;
481         return ret;
482     }
483
484     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
485     {
486         MessageContainer ocInfo;
487         ocInfo.addRepresentation(rep);
488         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
489     }
490
491 #ifdef CA_INT
492     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
493         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
494         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
495         PostCallback& callback, QualityOfService QoS)
496 #else
497     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
498         const std::string& uri, const OCRepresentation& rep,
499         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
500         PostCallback& callback, QualityOfService QoS)
501 #endif
502     {
503         OCStackResult result;
504         OCCallbackData cbdata = {0};
505
506         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
507         ctx->callback = callback;
508         cbdata.cb = &setResourceCallback;
509         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
510         cbdata.context = static_cast<void*>(ctx);
511
512         // TODO: in the future the cstack should be combining these two strings!
513         ostringstream os;
514         os << host << assembleSetResourceUri(uri, queryParams).c_str();
515         // TODO: end of above
516
517         auto cLock = m_csdkLock.lock();
518
519         if(cLock)
520         {
521             std::lock_guard<std::recursive_mutex> lock(*cLock);
522             OCHeaderOption options[MAX_HEADER_OPTIONS];
523             OCDoHandle handle;
524
525             assembleHeaderOptions(options, headerOptions);
526 #ifdef CA_INT
527             result = OCDoResource(&handle, OC_REST_POST,
528                                   os.str().c_str(), nullptr,
529                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
530                                   static_cast<OCQualityOfService>(QoS),
531                                   &cbdata, options, headerOptions.size());
532 #else
533             result = OCDoResource(&handle, OC_REST_POST,
534                                   os.str().c_str(), nullptr,
535                                   assembleSetResourcePayload(rep).c_str(),
536                                   static_cast<OCQualityOfService>(QoS),
537                                   &cbdata, options, headerOptions.size());
538 #endif
539         }
540         else
541         {
542             delete ctx;
543             result = OC_STACK_ERROR;
544         }
545
546         return result;
547     }
548
549 #ifdef CA_INT
550     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
551         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
552         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
553         PutCallback& callback, QualityOfService QoS)
554 #else
555     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
556         const std::string& uri, const OCRepresentation& rep,
557         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
558         PutCallback& callback, QualityOfService QoS)
559 #endif
560     {
561         OCStackResult result;
562         OCCallbackData cbdata = {0};
563
564         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
565         ctx->callback = callback;
566         cbdata.cb = &setResourceCallback;
567         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
568         cbdata.context = static_cast<void*>(ctx);
569
570         // TODO: in the future the cstack should be combining these two strings!
571         ostringstream os;
572         os << host << assembleSetResourceUri(uri, queryParams).c_str();
573         // TODO: end of above
574
575         auto cLock = m_csdkLock.lock();
576
577         if(cLock)
578         {
579             std::lock_guard<std::recursive_mutex> lock(*cLock);
580             OCDoHandle handle;
581             OCHeaderOption options[MAX_HEADER_OPTIONS];
582
583             assembleHeaderOptions(options, headerOptions);
584 #ifdef CA_INT
585             result = OCDoResource(&handle, OC_REST_PUT,
586                                   os.str().c_str(), nullptr,
587                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
588                                   static_cast<OCQualityOfService>(QoS),
589                                   &cbdata,
590                                   options, headerOptions.size());
591 #else
592             result = OCDoResource(&handle, OC_REST_PUT,
593                                   os.str().c_str(), nullptr,
594                                   assembleSetResourcePayload(rep).c_str(),
595                                   static_cast<OCQualityOfService>(QoS),
596                                   &cbdata,
597                                   options, headerOptions.size());
598 #endif
599         }
600         else
601         {
602             delete ctx;
603             result = OC_STACK_ERROR;
604         }
605
606         return result;
607     }
608
609     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
610         OCClientResponse* clientResponse)
611     {
612         ClientCallbackContext::DeleteContext* context =
613             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
614         HeaderOptions serverHeaderOptions;
615
616         if(clientResponse->result == OC_STACK_OK)
617         {
618             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
619         }
620         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
621         exec.detach();
622         return OC_STACK_DELETE_TRANSACTION;
623     }
624
625 #ifdef CA_INT
626     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
627         const std::string& uri, OCConnectivityType connectivityType,
628         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
629 #else
630     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
631         const std::string& uri, const HeaderOptions& headerOptions,
632          DeleteCallback& callback, QualityOfService QoS)
633 #endif
634     {
635         OCStackResult result;
636         OCCallbackData cbdata = {0};
637
638         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
639         ctx->callback = callback;
640         cbdata.cb = &deleteResourceCallback;
641         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
642         cbdata.context = static_cast<void*>(ctx);
643
644         ostringstream os;
645         os << host << uri;
646
647         auto cLock = m_csdkLock.lock();
648
649         if(cLock)
650         {
651             OCHeaderOption options[MAX_HEADER_OPTIONS];
652             OCDoHandle handle;
653
654             assembleHeaderOptions(options, headerOptions);
655
656             std::lock_guard<std::recursive_mutex> lock(*cLock);
657 #ifdef CA_INT
658             result = OCDoResource(&handle, OC_REST_DELETE,
659                                   os.str().c_str(), nullptr,
660                                   nullptr, connectivityType,
661                                   static_cast<OCQualityOfService>(m_cfg.QoS),
662                                   &cbdata, options, headerOptions.size());
663 #else
664             result = OCDoResource(&handle, OC_REST_DELETE,
665                                   os.str().c_str(), nullptr,
666                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
667                                   &cbdata, options, headerOptions.size());
668 #endif
669         }
670         else
671         {
672             delete ctx;
673             result = OC_STACK_ERROR;
674         }
675
676         return result;
677     }
678
679     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
680         OCClientResponse* clientResponse)
681     {
682         ClientCallbackContext::ObserveContext* context =
683             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
684         OCRepresentation attrs;
685         HeaderOptions serverHeaderOptions;
686         uint32_t sequenceNumber = clientResponse->sequenceNumber;
687         OCStackResult result = clientResponse->result;
688         if(clientResponse->result == OC_STACK_OK)
689         {
690             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
691             try
692             {
693                 attrs = parseGetSetCallback(clientResponse);
694             }
695             catch(OC::OCException& e)
696             {
697                 result = e.code();
698             }
699         }
700         std::thread exec(context->callback, serverHeaderOptions, attrs,
701                     result, sequenceNumber);
702         exec.detach();
703         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
704         {
705             return OC_STACK_DELETE_TRANSACTION;
706         }
707         return OC_STACK_KEEP_TRANSACTION;
708     }
709
710 #ifdef CA_INT
711     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
712         const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
713         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
714         ObserveCallback& callback, QualityOfService QoS)
715 #else
716     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
717         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
718         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
719 #endif
720     {
721         OCStackResult result;
722         OCCallbackData cbdata = {0};
723
724         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
725         ctx->callback = callback;
726         cbdata.context = static_cast<void*>(ctx);
727         cbdata.cb = &observeResourceCallback;
728         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
729
730         OCMethod method;
731         if (observeType == ObserveType::Observe)
732         {
733             method = OC_REST_OBSERVE;
734         }
735         else if (observeType == ObserveType::ObserveAll)
736         {
737             method = OC_REST_OBSERVE_ALL;
738         }
739         else
740         {
741             method = OC_REST_OBSERVE_ALL;
742         }
743
744         auto cLock = m_csdkLock.lock();
745
746         if(cLock)
747         {
748             std::ostringstream os;
749             os << host << assembleSetResourceUri(uri, queryParams).c_str();
750
751             std::lock_guard<std::recursive_mutex> lock(*cLock);
752             OCHeaderOption options[MAX_HEADER_OPTIONS];
753
754             assembleHeaderOptions(options, headerOptions);
755 #ifdef CA_INT
756             result = OCDoResource(handle, method,
757                                   os.str().c_str(), nullptr,
758                                   nullptr, connectivityType,
759                                   static_cast<OCQualityOfService>(QoS),
760                                   &cbdata,
761                                   options, headerOptions.size());
762 #else
763             result = OCDoResource(handle, method,
764                                   os.str().c_str(), nullptr,
765                                   nullptr,
766                                   static_cast<OCQualityOfService>(QoS),
767                                   &cbdata,
768                                   options, headerOptions.size());
769 #endif
770         }
771         else
772         {
773             delete ctx;
774             return OC_STACK_ERROR;
775         }
776
777         return result;
778     }
779
780     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
781         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
782         QualityOfService QoS)
783     {
784         OCStackResult result;
785         auto cLock = m_csdkLock.lock();
786
787         if(cLock)
788         {
789             std::lock_guard<std::recursive_mutex> lock(*cLock);
790             OCHeaderOption options[MAX_HEADER_OPTIONS];
791
792             assembleHeaderOptions(options, headerOptions);
793             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
794                     headerOptions.size());
795         }
796         else
797         {
798             result = OC_STACK_ERROR;
799         }
800
801         return result;
802     }
803
804     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
805             OCClientResponse* clientResponse)
806     {
807         char stringAddress[DEV_ADDR_SIZE_MAX];
808         ostringstream os;
809         uint16_t port;
810
811         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
812                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
813         {
814             os<<stringAddress<<":"<<port;
815
816             ClientCallbackContext::SubscribePresenceContext* context =
817                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
818
819             std::thread exec(context->callback, clientResponse->result,
820                     clientResponse->sequenceNumber, os.str());
821
822             exec.detach();
823         }
824         else
825         {
826             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
827                     <<"failed"<< std::flush;
828         }
829         return OC_STACK_KEEP_TRANSACTION;
830     }
831
832 #ifdef CA_INT
833     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
834         const std::string& host, const std::string& resourceType,
835         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
836 #else
837     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
838         const std::string& host, const std::string& resourceType,
839         SubscribeCallback& presenceHandler)
840 #endif
841     {
842         OCCallbackData cbdata = {0};
843
844         ClientCallbackContext::SubscribePresenceContext* ctx =
845             new ClientCallbackContext::SubscribePresenceContext();
846         ctx->callback = presenceHandler;
847         cbdata.cb = &subscribePresenceCallback;
848         cbdata.context = static_cast<void*>(ctx);
849         cbdata.cd = [](void* c)
850             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
851         auto cLock = m_csdkLock.lock();
852
853         std::ostringstream os;
854         os << host << "/oc/presence";
855
856         if(!resourceType.empty())
857         {
858             os << "?rt=" << resourceType;
859         }
860
861         if(!cLock)
862         {
863             delete ctx;
864             return OC_STACK_ERROR;
865         }
866
867 #ifdef CA_INT
868         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
869                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
870 #else
871         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
872                             OC_LOW_QOS, &cbdata, NULL, 0);
873 #endif
874     }
875
876     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
877     {
878         OCStackResult result;
879         auto cLock = m_csdkLock.lock();
880
881         if(cLock)
882         {
883             std::lock_guard<std::recursive_mutex> lock(*cLock);
884             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
885         }
886         else
887         {
888             result = OC_STACK_ERROR;
889         }
890
891         return result;
892     }
893
894     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
895     {
896         qos = m_cfg.QoS;
897         return OC_STACK_OK;
898     }
899
900     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
901            const HeaderOptions& headerOptions)
902     {
903         int i = 0;
904
905         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
906         {
907             options[i].protocolID = OC_COAP_ID;
908             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
909             options[i].optionLength = (it->getOptionData()).length() + 1;
910             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
911                     (it->getOptionData()).length() + 1);
912             i++;
913         }
914     }
915 }