97fe351e6387c058065675c3dff06ac39da60461
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             return OCRepresentation();
100         }
101
102         MessageContainer oc;
103         try
104         {
105             oc.setJSONRepresentation(clientResponse->resJSONPayload);
106         }
107         catch (cereal::RapidJSONException& ex)
108         {
109             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
112         }
113         catch (cereal::Exception& ex)
114         {
115             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
118         }
119
120         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121         if(it == oc.representations().end())
122         {
123             return OCRepresentation();
124         }
125
126         // first one is considered the root, everything else is considered a child of this one.
127         OCRepresentation root = *it;
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if(clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         auto clientWrapper = context->clientWrapper.lock();
153
154         if(!clientWrapper)
155         {
156             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
157                     << std::flush;
158             return OC_STACK_KEEP_TRANSACTION;
159         }
160
161         std::stringstream requestStream;
162         requestStream << clientResponse->resJSONPayload;
163
164         try
165         {
166
167             ListenOCContainer container(clientWrapper, *clientResponse->addr,
168                     clientResponse->connType, requestStream);
169             // loop to ensure valid construction of all resources
170             for(auto resource : container.Resources())
171             {
172                 std::thread exec(context->callback, resource);
173                 exec.detach();
174             }
175
176         }
177         catch(const std::exception& e)
178         {
179             oclog() << "listenCallback failed to parse a malformed message: "
180                     << e.what()
181                     << std::endl
182                     << clientResponse->resJSONPayload
183                     << std::endl
184                     << clientResponse->result
185                     << std::flush;
186             return OC_STACK_KEEP_TRANSACTION;
187         }
188
189         return OC_STACK_KEEP_TRANSACTION;
190     }
191
192     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193         const std::string& resourceType, OCConnectivityType connectivityType,
194         FindCallback& callback, QualityOfService QoS)
195     {
196         OCStackResult result;
197
198         OCCallbackData cbdata = {0};
199
200         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
201         context->callback = callback;
202         context->clientWrapper = shared_from_this();
203
204         cbdata.context =  static_cast<void*>(context);
205         cbdata.cb = listenCallback;
206         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
207
208         auto cLock = m_csdkLock.lock();
209         if(cLock)
210         {
211             std::lock_guard<std::recursive_mutex> lock(*cLock);
212             OCDoHandle handle;
213             result = OCDoResource(&handle, OC_REST_GET,
214                                   resourceType.c_str(),
215                                   nullptr, nullptr, connectivityType,
216                                   static_cast<OCQualityOfService>(QoS),
217                                   &cbdata,
218                                   NULL, 0);
219         }
220         else
221         {
222             delete context;
223             result = OC_STACK_ERROR;
224         }
225         return result;
226     }
227
228     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
229             OCClientResponse* clientResponse)
230     {
231         ClientCallbackContext::DeviceListenContext* context =
232             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
233
234         try
235         {
236             OCRepresentation rep = parseGetSetCallback(clientResponse);
237             std::thread exec(context->callback, rep);
238             exec.detach();
239         }
240         catch(OC::OCException& e)
241         {
242             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
243                 <<e.what() <<std::flush;
244         }
245
246         return OC_STACK_KEEP_TRANSACTION;
247     }
248
249     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
250         const std::string& deviceURI, OCConnectivityType connectivityType,
251         FindDeviceCallback& callback, QualityOfService QoS)
252     {
253         OCStackResult result;
254
255         OCCallbackData cbdata = {0};
256         ClientCallbackContext::DeviceListenContext* context =
257             new ClientCallbackContext::DeviceListenContext();
258         context->callback = callback;
259         context->clientWrapper = shared_from_this();
260         cbdata.context =  static_cast<void*>(context);
261         cbdata.cb = listenDeviceCallback;
262         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
263
264         auto cLock = m_csdkLock.lock();
265         if(cLock)
266         {
267             std::lock_guard<std::recursive_mutex> lock(*cLock);
268             OCDoHandle handle;
269             result = OCDoResource(&handle, OC_REST_GET,
270                                   deviceURI.c_str(),
271                                   nullptr, nullptr, connectivityType,
272                                   static_cast<OCQualityOfService>(QoS),
273                                   &cbdata,
274                                   NULL, 0);
275         }
276         else
277         {
278             delete context;
279             result = OC_STACK_ERROR;
280         }
281         return result;
282     }
283
284     void parseServerHeaderOptions(OCClientResponse* clientResponse,
285                     HeaderOptions& serverHeaderOptions)
286     {
287         if(clientResponse)
288         {
289             // Parse header options from server
290             uint16_t optionID;
291             std::string optionData;
292
293             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
294             {
295                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
296                 optionData = reinterpret_cast<const char*>
297                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
298                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
299                 serverHeaderOptions.push_back(headerOption);
300             }
301         }
302         else
303         {
304             // clientResponse is invalid
305             // TODO check proper logging
306             std::cout << " Invalid response " << std::endl;
307         }
308     }
309
310     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
311         OCClientResponse* clientResponse)
312     {
313         ClientCallbackContext::GetContext* context =
314             static_cast<ClientCallbackContext::GetContext*>(ctx);
315
316         OCRepresentation rep;
317         HeaderOptions serverHeaderOptions;
318         OCStackResult result = clientResponse->result;
319         if(result == OC_STACK_OK)
320         {
321             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
322             try
323             {
324                 rep = parseGetSetCallback(clientResponse);
325             }
326             catch(OC::OCException& e)
327             {
328                 result = e.code();
329             }
330         }
331
332         std::thread exec(context->callback, serverHeaderOptions, rep, result);
333         exec.detach();
334         return OC_STACK_DELETE_TRANSACTION;
335     }
336
337     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
338         const std::string& uri, OCConnectivityType connectivityType,
339         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
340         GetCallback& callback, QualityOfService QoS)
341     {
342         OCStackResult result;
343         OCCallbackData cbdata = {0};
344
345         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
346         ctx->callback = callback;
347         cbdata.context = static_cast<void*>(ctx);
348         cbdata.cb = &getResourceCallback;
349         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
350
351         auto cLock = m_csdkLock.lock();
352
353         if(cLock)
354         {
355             std::ostringstream os;
356             os << host << assembleSetResourceUri(uri, queryParams).c_str();
357
358             std::lock_guard<std::recursive_mutex> lock(*cLock);
359             OCDoHandle handle;
360             OCHeaderOption options[MAX_HEADER_OPTIONS];
361
362             assembleHeaderOptions(options, headerOptions);
363
364             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
365                                   nullptr, nullptr, connectivityType,
366                                   static_cast<OCQualityOfService>(QoS),
367                                   &cbdata,
368                                   options, headerOptions.size());
369         }
370         else
371         {
372             delete ctx;
373             result = OC_STACK_ERROR;
374         }
375         return result;
376     }
377
378
379     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
380         OCClientResponse* clientResponse)
381     {
382         ClientCallbackContext::SetContext* context =
383             static_cast<ClientCallbackContext::SetContext*>(ctx);
384         OCRepresentation attrs;
385         HeaderOptions serverHeaderOptions;
386
387         OCStackResult result = clientResponse->result;
388         if (OC_STACK_OK               == result ||
389             OC_STACK_RESOURCE_CREATED == result ||
390             OC_STACK_RESOURCE_DELETED == result)
391         {
392             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
393             try
394             {
395                 attrs = parseGetSetCallback(clientResponse);
396             }
397             catch(OC::OCException& e)
398             {
399                 result = e.code();
400             }
401         }
402
403         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
404         exec.detach();
405         return OC_STACK_DELETE_TRANSACTION;
406     }
407
408     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
409         const QueryParamsMap& queryParams)
410     {
411         if(uri.back() == '/')
412         {
413             uri.resize(uri.size()-1);
414         }
415
416         ostringstream paramsList;
417         if(queryParams.size() > 0)
418         {
419             paramsList << '?';
420         }
421
422         for(auto& param : queryParams)
423         {
424             paramsList << param.first <<'='<<param.second<<'&';
425         }
426
427         std::string queryString = paramsList.str();
428         if(queryString.back() == '&')
429         {
430             queryString.resize(queryString.size() - 1);
431         }
432
433         std::string ret = uri + queryString;
434         return ret;
435     }
436
437     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
438     {
439         MessageContainer ocInfo;
440         ocInfo.addRepresentation(rep);
441         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
442     }
443
444     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
445         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
446         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
447         PostCallback& callback, QualityOfService QoS)
448     {
449         OCStackResult result;
450         OCCallbackData cbdata = {0};
451
452         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
453         ctx->callback = callback;
454         cbdata.cb = &setResourceCallback;
455         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
456         cbdata.context = static_cast<void*>(ctx);
457
458         // TODO: in the future the cstack should be combining these two strings!
459         ostringstream os;
460         os << host << assembleSetResourceUri(uri, queryParams).c_str();
461         // TODO: end of above
462
463         auto cLock = m_csdkLock.lock();
464
465         if(cLock)
466         {
467             std::lock_guard<std::recursive_mutex> lock(*cLock);
468             OCHeaderOption options[MAX_HEADER_OPTIONS];
469             OCDoHandle handle;
470
471             assembleHeaderOptions(options, headerOptions);
472             result = OCDoResource(&handle, OC_REST_POST,
473                                   os.str().c_str(), nullptr,
474                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
475                                   static_cast<OCQualityOfService>(QoS),
476                                   &cbdata, options, headerOptions.size());
477         }
478         else
479         {
480             delete ctx;
481             result = OC_STACK_ERROR;
482         }
483
484         return result;
485     }
486
487     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
488         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
489         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
490         PutCallback& callback, QualityOfService QoS)
491     {
492         OCStackResult result;
493         OCCallbackData cbdata = {0};
494
495         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
496         ctx->callback = callback;
497         cbdata.cb = &setResourceCallback;
498         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
499         cbdata.context = static_cast<void*>(ctx);
500
501         // TODO: in the future the cstack should be combining these two strings!
502         ostringstream os;
503         os << host << assembleSetResourceUri(uri, queryParams).c_str();
504         // TODO: end of above
505
506         auto cLock = m_csdkLock.lock();
507
508         if(cLock)
509         {
510             std::lock_guard<std::recursive_mutex> lock(*cLock);
511             OCDoHandle handle;
512             OCHeaderOption options[MAX_HEADER_OPTIONS];
513
514             assembleHeaderOptions(options, headerOptions);
515             result = OCDoResource(&handle, OC_REST_PUT,
516                                   os.str().c_str(), nullptr,
517                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
518                                   static_cast<OCQualityOfService>(QoS),
519                                   &cbdata,
520                                   options, headerOptions.size());
521         }
522         else
523         {
524             delete ctx;
525             result = OC_STACK_ERROR;
526         }
527
528         return result;
529     }
530
531     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
532         OCClientResponse* clientResponse)
533     {
534         ClientCallbackContext::DeleteContext* context =
535             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
536         HeaderOptions serverHeaderOptions;
537
538         if(clientResponse->result == OC_STACK_OK)
539         {
540             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
541         }
542         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
543         exec.detach();
544         return OC_STACK_DELETE_TRANSACTION;
545     }
546
547     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
548         const std::string& uri, OCConnectivityType connectivityType,
549         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
550     {
551         OCStackResult result;
552         OCCallbackData cbdata = {0};
553
554         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
555         ctx->callback = callback;
556         cbdata.cb = &deleteResourceCallback;
557         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
558         cbdata.context = static_cast<void*>(ctx);
559
560         ostringstream os;
561         os << host << uri;
562
563         auto cLock = m_csdkLock.lock();
564
565         if(cLock)
566         {
567             OCHeaderOption options[MAX_HEADER_OPTIONS];
568             OCDoHandle handle;
569
570             assembleHeaderOptions(options, headerOptions);
571
572             std::lock_guard<std::recursive_mutex> lock(*cLock);
573
574             result = OCDoResource(&handle, OC_REST_DELETE,
575                                   os.str().c_str(), nullptr,
576                                   nullptr, connectivityType,
577                                   static_cast<OCQualityOfService>(m_cfg.QoS),
578                                   &cbdata, options, headerOptions.size());
579         }
580         else
581         {
582             delete ctx;
583             result = OC_STACK_ERROR;
584         }
585
586         return result;
587     }
588
589     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
590         OCClientResponse* clientResponse)
591     {
592         ClientCallbackContext::ObserveContext* context =
593             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
594         OCRepresentation attrs;
595         HeaderOptions serverHeaderOptions;
596         uint32_t sequenceNumber = clientResponse->sequenceNumber;
597         OCStackResult result = clientResponse->result;
598         if(clientResponse->result == OC_STACK_OK)
599         {
600             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
601             try
602             {
603                 attrs = parseGetSetCallback(clientResponse);
604             }
605             catch(OC::OCException& e)
606             {
607                 result = e.code();
608             }
609         }
610         std::thread exec(context->callback, serverHeaderOptions, attrs,
611                     result, sequenceNumber);
612         exec.detach();
613         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
614         {
615             return OC_STACK_DELETE_TRANSACTION;
616         }
617         return OC_STACK_KEEP_TRANSACTION;
618     }
619
620     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
621         const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
622         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
623         ObserveCallback& callback, QualityOfService QoS)
624     {
625         OCStackResult result;
626         OCCallbackData cbdata = {0};
627
628         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
629         ctx->callback = callback;
630         cbdata.context = static_cast<void*>(ctx);
631         cbdata.cb = &observeResourceCallback;
632         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
633
634         OCMethod method;
635         if (observeType == ObserveType::Observe)
636         {
637             method = OC_REST_OBSERVE;
638         }
639         else if (observeType == ObserveType::ObserveAll)
640         {
641             method = OC_REST_OBSERVE_ALL;
642         }
643         else
644         {
645             method = OC_REST_OBSERVE_ALL;
646         }
647
648         auto cLock = m_csdkLock.lock();
649
650         if(cLock)
651         {
652             std::ostringstream os;
653             os << host << assembleSetResourceUri(uri, queryParams).c_str();
654
655             std::lock_guard<std::recursive_mutex> lock(*cLock);
656             OCHeaderOption options[MAX_HEADER_OPTIONS];
657
658             assembleHeaderOptions(options, headerOptions);
659             result = OCDoResource(handle, method,
660                                   os.str().c_str(), nullptr,
661                                   nullptr, connectivityType,
662                                   static_cast<OCQualityOfService>(QoS),
663                                   &cbdata,
664                                   options, headerOptions.size());
665         }
666         else
667         {
668             delete ctx;
669             return OC_STACK_ERROR;
670         }
671
672         return result;
673     }
674
675     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
676         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
677         QualityOfService QoS)
678     {
679         OCStackResult result;
680         auto cLock = m_csdkLock.lock();
681
682         if(cLock)
683         {
684             std::lock_guard<std::recursive_mutex> lock(*cLock);
685             OCHeaderOption options[MAX_HEADER_OPTIONS];
686
687             assembleHeaderOptions(options, headerOptions);
688             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
689                     headerOptions.size());
690         }
691         else
692         {
693             result = OC_STACK_ERROR;
694         }
695
696         return result;
697     }
698
699     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
700             OCClientResponse* clientResponse)
701     {
702         char stringAddress[DEV_ADDR_SIZE_MAX];
703         ostringstream os;
704         uint16_t port;
705
706         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
707                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
708         {
709             os<<stringAddress<<":"<<port;
710
711             ClientCallbackContext::SubscribePresenceContext* context =
712                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
713
714             std::thread exec(context->callback, clientResponse->result,
715                     clientResponse->sequenceNumber, os.str());
716
717             exec.detach();
718         }
719         else
720         {
721             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
722                     <<"failed"<< std::flush;
723         }
724         return OC_STACK_KEEP_TRANSACTION;
725     }
726
727     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
728         const std::string& host, const std::string& resourceType,
729         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
730     {
731         OCCallbackData cbdata = {0};
732
733         ClientCallbackContext::SubscribePresenceContext* ctx =
734             new ClientCallbackContext::SubscribePresenceContext();
735         ctx->callback = presenceHandler;
736         cbdata.cb = &subscribePresenceCallback;
737         cbdata.context = static_cast<void*>(ctx);
738         cbdata.cd = [](void* c)
739             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
740         auto cLock = m_csdkLock.lock();
741
742         std::ostringstream os;
743         os << host << "/oc/presence";
744
745         if(!resourceType.empty())
746         {
747             os << "?rt=" << resourceType;
748         }
749
750         if(!cLock)
751         {
752             delete ctx;
753             return OC_STACK_ERROR;
754         }
755
756         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
757                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
758     }
759
760     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
761     {
762         OCStackResult result;
763         auto cLock = m_csdkLock.lock();
764
765         if(cLock)
766         {
767             std::lock_guard<std::recursive_mutex> lock(*cLock);
768             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
769         }
770         else
771         {
772             result = OC_STACK_ERROR;
773         }
774
775         return result;
776     }
777
778     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
779     {
780         qos = m_cfg.QoS;
781         return OC_STACK_OK;
782     }
783
784     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
785            const HeaderOptions& headerOptions)
786     {
787         int i = 0;
788
789         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
790         {
791             options[i].protocolID = OC_COAP_ID;
792             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
793             options[i].optionLength = (it->getOptionData()).length() + 1;
794             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
795                     (it->getOptionData()).length() + 1);
796             i++;
797         }
798     }
799 }