Fixed crash on rec'd invalid data.
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             return OCRepresentation();
100         }
101
102         MessageContainer oc;
103         try
104         {
105             oc.setJSONRepresentation(clientResponse->resJSONPayload);
106         }
107         catch (cereal::RapidJSONException& ex)
108         {
109             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
112         }
113         catch (cereal::Exception& ex)
114         {
115             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
118         }
119
120         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121         if(it == oc.representations().end())
122         {
123             return OCRepresentation();
124         }
125
126         // first one is considered the root, everything else is considered a child of this one.
127         OCRepresentation root = *it;
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if(clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         auto clientWrapper = context->clientWrapper.lock();
153
154         if(!clientWrapper)
155         {
156             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
157                     << std::flush;
158             return OC_STACK_KEEP_TRANSACTION;
159         }
160
161         std::stringstream requestStream;
162         requestStream << clientResponse->resJSONPayload;
163
164         try
165         {
166
167 #ifdef CA_INT
168             ListenOCContainer container(clientWrapper, *clientResponse->addr,
169                     clientResponse->connType, requestStream);
170 #else
171             ListenOCContainer container(clientWrapper, *clientResponse->addr,
172                     requestStream);
173 #endif
174             // loop to ensure valid construction of all resources
175             for(auto resource : container.Resources())
176             {
177                 std::thread exec(context->callback, resource);
178                 exec.detach();
179             }
180
181         }
182         catch(const std::exception& e)
183         {
184             oclog() << "listenCallback failed to parse a malformed message: "
185                     << e.what()
186                     << std::endl
187                     << clientResponse->resJSONPayload
188                     << std::endl
189                     << clientResponse->result
190                     << std::flush;
191             return OC_STACK_KEEP_TRANSACTION;
192         }
193
194         return OC_STACK_KEEP_TRANSACTION;
195     }
196
197 #ifdef CA_INT
198     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
199         const std::string& resourceType, OCConnectivityType connectivityType,
200         FindCallback& callback, QualityOfService QoS)
201 #else
202     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
203         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
204 #endif
205     {
206         OCStackResult result;
207
208         OCCallbackData cbdata = {0};
209
210         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
211         context->callback = callback;
212         context->clientWrapper = shared_from_this();
213
214         cbdata.context =  static_cast<void*>(context);
215         cbdata.cb = listenCallback;
216         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
217
218         auto cLock = m_csdkLock.lock();
219         if(cLock)
220         {
221             std::lock_guard<std::recursive_mutex> lock(*cLock);
222             OCDoHandle handle;
223 #ifdef CA_INT
224             result = OCDoResource(&handle, OC_REST_GET,
225                                   resourceType.c_str(),
226                                   nullptr, nullptr, connectivityType,
227                                   static_cast<OCQualityOfService>(QoS),
228                                   &cbdata,
229                                   NULL, 0);
230 #else
231             result = OCDoResource(&handle, OC_REST_GET,
232                                   resourceType.c_str(),
233                                   nullptr, nullptr,
234                                   static_cast<OCQualityOfService>(QoS),
235                                   &cbdata,
236                                   NULL, 0);
237 #endif
238         }
239         else
240         {
241             delete context;
242             result = OC_STACK_ERROR;
243         }
244         return result;
245     }
246
247     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
248             OCClientResponse* clientResponse)
249     {
250         ClientCallbackContext::DeviceListenContext* context =
251             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
252
253         try
254         {
255             OCRepresentation rep = parseGetSetCallback(clientResponse);
256             std::thread exec(context->callback, rep);
257             exec.detach();
258         }
259         catch(OC::OCException& e)
260         {
261             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
262                 <<e.what() <<std::flush;
263         }
264
265         return OC_STACK_KEEP_TRANSACTION;
266     }
267
268 #ifdef CA_INT
269     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
270         const std::string& deviceURI, OCConnectivityType connectivityType,
271         FindDeviceCallback& callback, QualityOfService QoS)
272 #else
273     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
274         const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
275 #endif
276     {
277         OCStackResult result;
278
279         OCCallbackData cbdata = {0};
280         ClientCallbackContext::DeviceListenContext* context =
281             new ClientCallbackContext::DeviceListenContext();
282         context->callback = callback;
283         context->clientWrapper = shared_from_this();
284         cbdata.context =  static_cast<void*>(context);
285         cbdata.cb = listenDeviceCallback;
286         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
287
288         auto cLock = m_csdkLock.lock();
289         if(cLock)
290         {
291             std::lock_guard<std::recursive_mutex> lock(*cLock);
292             OCDoHandle handle;
293 #ifdef CA_INT
294             result = OCDoResource(&handle, OC_REST_GET,
295                                   deviceURI.c_str(),
296                                   nullptr, nullptr, connectivityType,
297                                   static_cast<OCQualityOfService>(QoS),
298                                   &cbdata,
299                                   NULL, 0);
300 #else
301             result = OCDoResource(&handle, OC_REST_GET,
302                                   deviceURI.c_str(),
303                                   nullptr, nullptr,
304                                   static_cast<OCQualityOfService>(QoS),
305                                   &cbdata,
306                                   NULL, 0);
307 #endif
308         }
309         else
310         {
311             result = OC_STACK_ERROR;
312         }
313         return result;
314     }
315
316     void parseServerHeaderOptions(OCClientResponse* clientResponse,
317                     HeaderOptions& serverHeaderOptions)
318     {
319         if(clientResponse)
320         {
321             // Parse header options from server
322             uint16_t optionID;
323             std::string optionData;
324
325             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
326             {
327                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
328                 optionData = reinterpret_cast<const char*>
329                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
330                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
331                 serverHeaderOptions.push_back(headerOption);
332             }
333         }
334         else
335         {
336             // clientResponse is invalid
337             // TODO check proper logging
338             std::cout << " Invalid response " << std::endl;
339         }
340     }
341
342     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
343         OCClientResponse* clientResponse)
344     {
345         ClientCallbackContext::GetContext* context =
346             static_cast<ClientCallbackContext::GetContext*>(ctx);
347
348         OCRepresentation rep;
349         HeaderOptions serverHeaderOptions;
350         OCStackResult result = clientResponse->result;
351         if(result == OC_STACK_OK)
352         {
353             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
354             try
355             {
356                 rep = parseGetSetCallback(clientResponse);
357             }
358             catch(OC::OCException& e)
359             {
360                 result = e.code();
361             }
362         }
363
364         std::thread exec(context->callback, serverHeaderOptions, rep, result);
365         exec.detach();
366         return OC_STACK_DELETE_TRANSACTION;
367     }
368
369 #ifdef CA_INT
370     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
371         const std::string& uri, OCConnectivityType connectivityType,
372         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
373         GetCallback& callback, QualityOfService QoS)
374 #else
375     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
376         const std::string& uri, const QueryParamsMap& queryParams,
377         const HeaderOptions& headerOptions, GetCallback& callback,
378         QualityOfService QoS)
379 #endif
380     {
381         OCStackResult result;
382         OCCallbackData cbdata = {0};
383
384         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
385         ctx->callback = callback;
386         cbdata.context = static_cast<void*>(ctx);
387         cbdata.cb = &getResourceCallback;
388         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
389
390         auto cLock = m_csdkLock.lock();
391
392         if(cLock)
393         {
394             std::ostringstream os;
395             os << host << assembleSetResourceUri(uri, queryParams).c_str();
396
397             std::lock_guard<std::recursive_mutex> lock(*cLock);
398             OCDoHandle handle;
399             OCHeaderOption options[MAX_HEADER_OPTIONS];
400
401             assembleHeaderOptions(options, headerOptions);
402 #ifdef CA_INT
403             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
404                                   nullptr, nullptr, connectivityType,
405                                   static_cast<OCQualityOfService>(QoS),
406                                   &cbdata,
407                                   options, headerOptions.size());
408 #else
409             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
410                                   nullptr, nullptr,
411                                   static_cast<OCQualityOfService>(QoS),
412                                   &cbdata,
413                                   options, headerOptions.size());
414 #endif
415         }
416         else
417         {
418             delete ctx;
419             result = OC_STACK_ERROR;
420         }
421         return result;
422     }
423
424
425     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
426         OCClientResponse* clientResponse)
427     {
428         ClientCallbackContext::SetContext* context =
429             static_cast<ClientCallbackContext::SetContext*>(ctx);
430         OCRepresentation attrs;
431         HeaderOptions serverHeaderOptions;
432
433         OCStackResult result = clientResponse->result;
434         if (OC_STACK_OK               == result ||
435             OC_STACK_RESOURCE_CREATED == result ||
436             OC_STACK_RESOURCE_DELETED == result)
437         {
438             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
439             try
440             {
441                 attrs = parseGetSetCallback(clientResponse);
442             }
443             catch(OC::OCException& e)
444             {
445                 result = e.code();
446             }
447         }
448
449         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
450         exec.detach();
451         return OC_STACK_DELETE_TRANSACTION;
452     }
453
454     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
455         const QueryParamsMap& queryParams)
456     {
457         if(uri.back() == '/')
458         {
459             uri.resize(uri.size()-1);
460         }
461
462         ostringstream paramsList;
463         if(queryParams.size() > 0)
464         {
465             paramsList << '?';
466         }
467
468         for(auto& param : queryParams)
469         {
470             paramsList << param.first <<'='<<param.second<<'&';
471         }
472
473         std::string queryString = paramsList.str();
474         if(queryString.back() == '&')
475         {
476             queryString.resize(queryString.size() - 1);
477         }
478
479         std::string ret = uri + queryString;
480         return ret;
481     }
482
483     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
484     {
485         MessageContainer ocInfo;
486         ocInfo.addRepresentation(rep);
487         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
488     }
489
490 #ifdef CA_INT
491     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
492         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
493         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
494         PostCallback& callback, QualityOfService QoS)
495 #else
496     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
497         const std::string& uri, const OCRepresentation& rep,
498         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
499         PostCallback& callback, QualityOfService QoS)
500 #endif
501     {
502         OCStackResult result;
503         OCCallbackData cbdata = {0};
504
505         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
506         ctx->callback = callback;
507         cbdata.cb = &setResourceCallback;
508         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
509         cbdata.context = static_cast<void*>(ctx);
510
511         // TODO: in the future the cstack should be combining these two strings!
512         ostringstream os;
513         os << host << assembleSetResourceUri(uri, queryParams).c_str();
514         // TODO: end of above
515
516         auto cLock = m_csdkLock.lock();
517
518         if(cLock)
519         {
520             std::lock_guard<std::recursive_mutex> lock(*cLock);
521             OCHeaderOption options[MAX_HEADER_OPTIONS];
522             OCDoHandle handle;
523
524             assembleHeaderOptions(options, headerOptions);
525 #ifdef CA_INT
526             result = OCDoResource(&handle, OC_REST_POST,
527                                   os.str().c_str(), nullptr,
528                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
529                                   static_cast<OCQualityOfService>(QoS),
530                                   &cbdata, options, headerOptions.size());
531 #else
532             result = OCDoResource(&handle, OC_REST_POST,
533                                   os.str().c_str(), nullptr,
534                                   assembleSetResourcePayload(rep).c_str(),
535                                   static_cast<OCQualityOfService>(QoS),
536                                   &cbdata, options, headerOptions.size());
537 #endif
538         }
539         else
540         {
541             delete ctx;
542             result = OC_STACK_ERROR;
543         }
544
545         return result;
546     }
547
548 #ifdef CA_INT
549     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
550         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
551         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
552         PutCallback& callback, QualityOfService QoS)
553 #else
554     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
555         const std::string& uri, const OCRepresentation& rep,
556         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
557         PutCallback& callback, QualityOfService QoS)
558 #endif
559     {
560         OCStackResult result;
561         OCCallbackData cbdata = {0};
562
563         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
564         ctx->callback = callback;
565         cbdata.cb = &setResourceCallback;
566         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
567         cbdata.context = static_cast<void*>(ctx);
568
569         // TODO: in the future the cstack should be combining these two strings!
570         ostringstream os;
571         os << host << assembleSetResourceUri(uri, queryParams).c_str();
572         // TODO: end of above
573
574         auto cLock = m_csdkLock.lock();
575
576         if(cLock)
577         {
578             std::lock_guard<std::recursive_mutex> lock(*cLock);
579             OCDoHandle handle;
580             OCHeaderOption options[MAX_HEADER_OPTIONS];
581
582             assembleHeaderOptions(options, headerOptions);
583 #ifdef CA_INT
584             result = OCDoResource(&handle, OC_REST_PUT,
585                                   os.str().c_str(), nullptr,
586                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
587                                   static_cast<OCQualityOfService>(QoS),
588                                   &cbdata,
589                                   options, headerOptions.size());
590 #else
591             result = OCDoResource(&handle, OC_REST_PUT,
592                                   os.str().c_str(), nullptr,
593                                   assembleSetResourcePayload(rep).c_str(),
594                                   static_cast<OCQualityOfService>(QoS),
595                                   &cbdata,
596                                   options, headerOptions.size());
597 #endif
598         }
599         else
600         {
601             delete ctx;
602             result = OC_STACK_ERROR;
603         }
604
605         return result;
606     }
607
608     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
609         OCClientResponse* clientResponse)
610     {
611         ClientCallbackContext::DeleteContext* context =
612             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
613         HeaderOptions serverHeaderOptions;
614
615         if(clientResponse->result == OC_STACK_OK)
616         {
617             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
618         }
619         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
620         exec.detach();
621         return OC_STACK_DELETE_TRANSACTION;
622     }
623
624 #ifdef CA_INT
625     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
626         const std::string& uri, OCConnectivityType connectivityType,
627         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
628 #else
629     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
630         const std::string& uri, const HeaderOptions& headerOptions,
631          DeleteCallback& callback, QualityOfService QoS)
632 #endif
633     {
634         OCStackResult result;
635         OCCallbackData cbdata = {0};
636
637         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
638         ctx->callback = callback;
639         cbdata.cb = &deleteResourceCallback;
640         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
641         cbdata.context = static_cast<void*>(ctx);
642
643         ostringstream os;
644         os << host << uri;
645
646         auto cLock = m_csdkLock.lock();
647
648         if(cLock)
649         {
650             OCHeaderOption options[MAX_HEADER_OPTIONS];
651             OCDoHandle handle;
652
653             assembleHeaderOptions(options, headerOptions);
654
655             std::lock_guard<std::recursive_mutex> lock(*cLock);
656 #ifdef CA_INT
657             result = OCDoResource(&handle, OC_REST_DELETE,
658                                   os.str().c_str(), nullptr,
659                                   nullptr, connectivityType,
660                                   static_cast<OCQualityOfService>(m_cfg.QoS),
661                                   &cbdata, options, headerOptions.size());
662 #else
663             result = OCDoResource(&handle, OC_REST_DELETE,
664                                   os.str().c_str(), nullptr,
665                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
666                                   &cbdata, options, headerOptions.size());
667 #endif
668         }
669         else
670         {
671             delete ctx;
672             result = OC_STACK_ERROR;
673         }
674
675         return result;
676     }
677
678     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
679         OCClientResponse* clientResponse)
680     {
681         ClientCallbackContext::ObserveContext* context =
682             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
683         OCRepresentation attrs;
684         HeaderOptions serverHeaderOptions;
685         uint32_t sequenceNumber = clientResponse->sequenceNumber;
686         OCStackResult result = clientResponse->result;
687         if(clientResponse->result == OC_STACK_OK)
688         {
689             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
690             try
691             {
692                 attrs = parseGetSetCallback(clientResponse);
693             }
694             catch(OC::OCException& e)
695             {
696                 result = e.code();
697             }
698         }
699         std::thread exec(context->callback, serverHeaderOptions, attrs,
700                     result, sequenceNumber);
701         exec.detach();
702         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
703         {
704             return OC_STACK_DELETE_TRANSACTION;
705         }
706         return OC_STACK_KEEP_TRANSACTION;
707     }
708
709 #ifdef CA_INT
710     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
711         const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
712         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
713         ObserveCallback& callback, QualityOfService QoS)
714 #else
715     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
716         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
717         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
718 #endif
719     {
720         OCStackResult result;
721         OCCallbackData cbdata = {0};
722
723         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
724         ctx->callback = callback;
725         cbdata.context = static_cast<void*>(ctx);
726         cbdata.cb = &observeResourceCallback;
727         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
728
729         OCMethod method;
730         if (observeType == ObserveType::Observe)
731         {
732             method = OC_REST_OBSERVE;
733         }
734         else if (observeType == ObserveType::ObserveAll)
735         {
736             method = OC_REST_OBSERVE_ALL;
737         }
738         else
739         {
740             method = OC_REST_OBSERVE_ALL;
741         }
742
743         auto cLock = m_csdkLock.lock();
744
745         if(cLock)
746         {
747             std::ostringstream os;
748             os << host << assembleSetResourceUri(uri, queryParams).c_str();
749
750             std::lock_guard<std::recursive_mutex> lock(*cLock);
751             OCHeaderOption options[MAX_HEADER_OPTIONS];
752
753             assembleHeaderOptions(options, headerOptions);
754 #ifdef CA_INT
755             result = OCDoResource(handle, method,
756                                   os.str().c_str(), nullptr,
757                                   nullptr, connectivityType,
758                                   static_cast<OCQualityOfService>(QoS),
759                                   &cbdata,
760                                   options, headerOptions.size());
761 #else
762             result = OCDoResource(handle, method,
763                                   os.str().c_str(), nullptr,
764                                   nullptr,
765                                   static_cast<OCQualityOfService>(QoS),
766                                   &cbdata,
767                                   options, headerOptions.size());
768 #endif
769         }
770         else
771         {
772             delete ctx;
773             return OC_STACK_ERROR;
774         }
775
776         return result;
777     }
778
779     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
780         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
781         QualityOfService QoS)
782     {
783         OCStackResult result;
784         auto cLock = m_csdkLock.lock();
785
786         if(cLock)
787         {
788             std::lock_guard<std::recursive_mutex> lock(*cLock);
789             OCHeaderOption options[MAX_HEADER_OPTIONS];
790
791             assembleHeaderOptions(options, headerOptions);
792             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
793                     headerOptions.size());
794         }
795         else
796         {
797             result = OC_STACK_ERROR;
798         }
799
800         return result;
801     }
802
803     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
804             OCClientResponse* clientResponse)
805     {
806         char stringAddress[DEV_ADDR_SIZE_MAX];
807         ostringstream os;
808         uint16_t port;
809
810         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
811                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
812         {
813             os<<stringAddress<<":"<<port;
814
815             ClientCallbackContext::SubscribePresenceContext* context =
816                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
817
818             std::thread exec(context->callback, clientResponse->result,
819                     clientResponse->sequenceNumber, os.str());
820
821             exec.detach();
822         }
823         else
824         {
825             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
826                     <<"failed"<< std::flush;
827         }
828         return OC_STACK_KEEP_TRANSACTION;
829     }
830
831 #ifdef CA_INT
832     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
833         const std::string& host, const std::string& resourceType,
834         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
835 #else
836     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
837         const std::string& host, const std::string& resourceType,
838         SubscribeCallback& presenceHandler)
839 #endif
840     {
841         OCCallbackData cbdata = {0};
842
843         ClientCallbackContext::SubscribePresenceContext* ctx =
844             new ClientCallbackContext::SubscribePresenceContext();
845         ctx->callback = presenceHandler;
846         cbdata.cb = &subscribePresenceCallback;
847         cbdata.context = static_cast<void*>(ctx);
848         cbdata.cd = [](void* c)
849             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
850         auto cLock = m_csdkLock.lock();
851
852         std::ostringstream os;
853         os << host << "/oc/presence";
854
855         if(!resourceType.empty())
856         {
857             os << "?rt=" << resourceType;
858         }
859
860         if(!cLock)
861         {
862             delete ctx;
863             return OC_STACK_ERROR;
864         }
865
866 #ifdef CA_INT
867         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
868                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
869 #else
870         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
871                             OC_LOW_QOS, &cbdata, NULL, 0);
872 #endif
873     }
874
875     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
876     {
877         OCStackResult result;
878         auto cLock = m_csdkLock.lock();
879
880         if(cLock)
881         {
882             std::lock_guard<std::recursive_mutex> lock(*cLock);
883             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
884         }
885         else
886         {
887             result = OC_STACK_ERROR;
888         }
889
890         return result;
891     }
892
893     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
894     {
895         qos = m_cfg.QoS;
896         return OC_STACK_OK;
897     }
898
899     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
900            const HeaderOptions& headerOptions)
901     {
902         int i = 0;
903
904         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
905         {
906             options[i].protocolID = OC_COAP_ID;
907             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
908             options[i].optionLength = (it->getOptionData()).length() + 1;
909             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
910                     (it->getOptionData()).length() + 1);
911             i++;
912         }
913     }
914 }