2eb81fb7660ecef97983e87eaf4f6f5dd5fd3178
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             return OCRepresentation();
100         }
101
102         MessageContainer oc;
103         try
104         {
105             oc.setJSONRepresentation(clientResponse->resJSONPayload);
106         }
107         catch (cereal::RapidJSONException& ex)
108         {
109             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
112         }
113         catch (cereal::Exception& ex)
114         {
115             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
118         }
119
120         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121         if(it == oc.representations().end())
122         {
123             return OCRepresentation();
124         }
125
126         // first one is considered the root, everything else is considered a child of this one.
127         OCRepresentation root = *it;
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if(clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         auto clientWrapper = context->clientWrapper.lock();
153
154         if(!clientWrapper)
155         {
156             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
157                     << std::flush;
158             return OC_STACK_KEEP_TRANSACTION;
159         }
160
161         std::stringstream requestStream;
162         requestStream << clientResponse->resJSONPayload;
163
164         try
165         {
166
167             ListenOCContainer container(clientWrapper, *clientResponse->addr,
168                     clientResponse->connType, requestStream);
169             // loop to ensure valid construction of all resources
170             for(auto resource : container.Resources())
171             {
172                 std::thread exec(context->callback, resource);
173                 exec.detach();
174             }
175
176         }
177         catch(const std::exception& e)
178         {
179             oclog() << "listenCallback failed to parse a malformed message: "
180                     << e.what()
181                     << std::endl
182                     << clientResponse->resJSONPayload
183                     << std::endl
184                     << clientResponse->result
185                     << std::flush;
186             return OC_STACK_KEEP_TRANSACTION;
187         }
188
189         return OC_STACK_KEEP_TRANSACTION;
190     }
191
192     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193         const std::string& resourceType, OCConnectivityType connectivityType,
194         FindCallback& callback, QualityOfService QoS)
195     {
196         OCStackResult result;
197
198         OCCallbackData cbdata = {0};
199
200         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
201         context->callback = callback;
202         context->clientWrapper = shared_from_this();
203
204         cbdata.context =  static_cast<void*>(context);
205         cbdata.cb = listenCallback;
206         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
207
208         auto cLock = m_csdkLock.lock();
209         if(cLock)
210         {
211             std::lock_guard<std::recursive_mutex> lock(*cLock);
212             result = OCDoResource(nullptr, OC_REST_GET,
213                                   resourceType.c_str(),
214                                   nullptr, nullptr, connectivityType,
215                                   static_cast<OCQualityOfService>(QoS),
216                                   &cbdata,
217                                   NULL, 0);
218         }
219         else
220         {
221             delete context;
222             result = OC_STACK_ERROR;
223         }
224         return result;
225     }
226
227     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
228             OCClientResponse* clientResponse)
229     {
230         ClientCallbackContext::DeviceListenContext* context =
231             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
232
233         try
234         {
235             OCRepresentation rep = parseGetSetCallback(clientResponse);
236             std::thread exec(context->callback, rep);
237             exec.detach();
238         }
239         catch(OC::OCException& e)
240         {
241             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
242                 <<e.what() <<std::flush;
243         }
244
245         return OC_STACK_KEEP_TRANSACTION;
246     }
247
248     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
249         const std::string& deviceURI, OCConnectivityType connectivityType,
250         FindDeviceCallback& callback, QualityOfService QoS)
251     {
252         OCStackResult result;
253
254         OCCallbackData cbdata = {0};
255         ClientCallbackContext::DeviceListenContext* context =
256             new ClientCallbackContext::DeviceListenContext();
257         context->callback = callback;
258         context->clientWrapper = shared_from_this();
259         cbdata.context =  static_cast<void*>(context);
260         cbdata.cb = listenDeviceCallback;
261         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
262
263         auto cLock = m_csdkLock.lock();
264         if(cLock)
265         {
266             std::lock_guard<std::recursive_mutex> lock(*cLock);
267             result = OCDoResource(nullptr, OC_REST_GET,
268                                   deviceURI.c_str(),
269                                   nullptr, nullptr, connectivityType,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   NULL, 0);
273         }
274         else
275         {
276             delete context;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282     void parseServerHeaderOptions(OCClientResponse* clientResponse,
283                     HeaderOptions& serverHeaderOptions)
284     {
285         if(clientResponse)
286         {
287             // Parse header options from server
288             uint16_t optionID;
289             std::string optionData;
290
291             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
292             {
293                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
294                 optionData = reinterpret_cast<const char*>
295                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
296                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
297                 serverHeaderOptions.push_back(headerOption);
298             }
299         }
300         else
301         {
302             // clientResponse is invalid
303             // TODO check proper logging
304             std::cout << " Invalid response " << std::endl;
305         }
306     }
307
308     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
309         OCClientResponse* clientResponse)
310     {
311         ClientCallbackContext::GetContext* context =
312             static_cast<ClientCallbackContext::GetContext*>(ctx);
313
314         OCRepresentation rep;
315         HeaderOptions serverHeaderOptions;
316         OCStackResult result = clientResponse->result;
317         if(result == OC_STACK_OK)
318         {
319             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
320             try
321             {
322                 rep = parseGetSetCallback(clientResponse);
323             }
324             catch(OC::OCException& e)
325             {
326                 result = e.code();
327             }
328         }
329
330         std::thread exec(context->callback, serverHeaderOptions, rep, result);
331         exec.detach();
332         return OC_STACK_DELETE_TRANSACTION;
333     }
334
335     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
336         const std::string& uri, OCConnectivityType connectivityType,
337         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
338         GetCallback& callback, QualityOfService QoS)
339     {
340         OCStackResult result;
341         OCCallbackData cbdata = {0};
342
343         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
344         ctx->callback = callback;
345         cbdata.context = static_cast<void*>(ctx);
346         cbdata.cb = &getResourceCallback;
347         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
348
349         auto cLock = m_csdkLock.lock();
350
351         if(cLock)
352         {
353             std::ostringstream os;
354             os << host << assembleSetResourceUri(uri, queryParams).c_str();
355
356             std::lock_guard<std::recursive_mutex> lock(*cLock);
357             OCHeaderOption options[MAX_HEADER_OPTIONS];
358
359             assembleHeaderOptions(options, headerOptions);
360
361             result = OCDoResource(nullptr, OC_REST_GET, os.str().c_str(),
362                                   nullptr, nullptr, connectivityType,
363                                   static_cast<OCQualityOfService>(QoS),
364                                   &cbdata,
365                                   options, headerOptions.size());
366         }
367         else
368         {
369             delete ctx;
370             result = OC_STACK_ERROR;
371         }
372         return result;
373     }
374
375
376     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
377         OCClientResponse* clientResponse)
378     {
379         ClientCallbackContext::SetContext* context =
380             static_cast<ClientCallbackContext::SetContext*>(ctx);
381         OCRepresentation attrs;
382         HeaderOptions serverHeaderOptions;
383
384         OCStackResult result = clientResponse->result;
385         if (OC_STACK_OK               == result ||
386             OC_STACK_RESOURCE_CREATED == result ||
387             OC_STACK_RESOURCE_DELETED == result)
388         {
389             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
390             try
391             {
392                 attrs = parseGetSetCallback(clientResponse);
393             }
394             catch(OC::OCException& e)
395             {
396                 result = e.code();
397             }
398         }
399
400         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
401         exec.detach();
402         return OC_STACK_DELETE_TRANSACTION;
403     }
404
405     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
406         const QueryParamsMap& queryParams)
407     {
408         if(uri.back() == '/')
409         {
410             uri.resize(uri.size()-1);
411         }
412
413         ostringstream paramsList;
414         if(queryParams.size() > 0)
415         {
416             paramsList << '?';
417         }
418
419         for(auto& param : queryParams)
420         {
421             paramsList << param.first <<'='<<param.second<<'&';
422         }
423
424         std::string queryString = paramsList.str();
425         if(queryString.back() == '&')
426         {
427             queryString.resize(queryString.size() - 1);
428         }
429
430         std::string ret = uri + queryString;
431         return ret;
432     }
433
434     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
435     {
436         MessageContainer ocInfo;
437         ocInfo.addRepresentation(rep);
438         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
439     }
440
441     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
442         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
443         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
444         PostCallback& callback, QualityOfService QoS)
445     {
446         OCStackResult result;
447         OCCallbackData cbdata = {0};
448
449         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
450         ctx->callback = callback;
451         cbdata.cb = &setResourceCallback;
452         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
453         cbdata.context = static_cast<void*>(ctx);
454
455         // TODO: in the future the cstack should be combining these two strings!
456         ostringstream os;
457         os << host << assembleSetResourceUri(uri, queryParams).c_str();
458         // TODO: end of above
459
460         auto cLock = m_csdkLock.lock();
461
462         if(cLock)
463         {
464             std::lock_guard<std::recursive_mutex> lock(*cLock);
465             OCHeaderOption options[MAX_HEADER_OPTIONS];
466
467             assembleHeaderOptions(options, headerOptions);
468             result = OCDoResource(nullptr, OC_REST_POST,
469                                   os.str().c_str(), nullptr,
470                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
471                                   static_cast<OCQualityOfService>(QoS),
472                                   &cbdata, options, headerOptions.size());
473         }
474         else
475         {
476             delete ctx;
477             result = OC_STACK_ERROR;
478         }
479
480         return result;
481     }
482
483     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
484         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
485         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
486         PutCallback& callback, QualityOfService QoS)
487     {
488         OCStackResult result;
489         OCCallbackData cbdata = {0};
490
491         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
492         ctx->callback = callback;
493         cbdata.cb = &setResourceCallback;
494         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
495         cbdata.context = static_cast<void*>(ctx);
496
497         // TODO: in the future the cstack should be combining these two strings!
498         ostringstream os;
499         os << host << assembleSetResourceUri(uri, queryParams).c_str();
500         // TODO: end of above
501
502         auto cLock = m_csdkLock.lock();
503
504         if(cLock)
505         {
506             std::lock_guard<std::recursive_mutex> lock(*cLock);
507             OCDoHandle handle;
508             OCHeaderOption options[MAX_HEADER_OPTIONS];
509
510             assembleHeaderOptions(options, headerOptions);
511             result = OCDoResource(&handle, OC_REST_PUT,
512                                   os.str().c_str(), nullptr,
513                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
514                                   static_cast<OCQualityOfService>(QoS),
515                                   &cbdata,
516                                   options, headerOptions.size());
517         }
518         else
519         {
520             delete ctx;
521             result = OC_STACK_ERROR;
522         }
523
524         return result;
525     }
526
527     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
528         OCClientResponse* clientResponse)
529     {
530         ClientCallbackContext::DeleteContext* context =
531             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
532         HeaderOptions serverHeaderOptions;
533
534         if(clientResponse->result == OC_STACK_OK)
535         {
536             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
537         }
538         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
539         exec.detach();
540         return OC_STACK_DELETE_TRANSACTION;
541     }
542
543     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
544         const std::string& uri, OCConnectivityType connectivityType,
545         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
546     {
547         OCStackResult result;
548         OCCallbackData cbdata = {0};
549
550         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
551         ctx->callback = callback;
552         cbdata.cb = &deleteResourceCallback;
553         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
554         cbdata.context = static_cast<void*>(ctx);
555
556         ostringstream os;
557         os << host << uri;
558
559         auto cLock = m_csdkLock.lock();
560
561         if(cLock)
562         {
563             OCHeaderOption options[MAX_HEADER_OPTIONS];
564
565             assembleHeaderOptions(options, headerOptions);
566
567             std::lock_guard<std::recursive_mutex> lock(*cLock);
568
569             result = OCDoResource(nullptr, OC_REST_DELETE,
570                                   os.str().c_str(), nullptr,
571                                   nullptr, connectivityType,
572                                   static_cast<OCQualityOfService>(m_cfg.QoS),
573                                   &cbdata, options, headerOptions.size());
574         }
575         else
576         {
577             delete ctx;
578             result = OC_STACK_ERROR;
579         }
580
581         return result;
582     }
583
584     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
585         OCClientResponse* clientResponse)
586     {
587         ClientCallbackContext::ObserveContext* context =
588             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
589         OCRepresentation attrs;
590         HeaderOptions serverHeaderOptions;
591         uint32_t sequenceNumber = clientResponse->sequenceNumber;
592         OCStackResult result = clientResponse->result;
593         if(clientResponse->result == OC_STACK_OK)
594         {
595             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
596             try
597             {
598                 attrs = parseGetSetCallback(clientResponse);
599             }
600             catch(OC::OCException& e)
601             {
602                 result = e.code();
603             }
604         }
605         std::thread exec(context->callback, serverHeaderOptions, attrs,
606                     result, sequenceNumber);
607         exec.detach();
608         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
609         {
610             return OC_STACK_DELETE_TRANSACTION;
611         }
612         return OC_STACK_KEEP_TRANSACTION;
613     }
614
615     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
616         const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
617         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
618         ObserveCallback& callback, QualityOfService QoS)
619     {
620         OCStackResult result;
621         OCCallbackData cbdata = {0};
622
623         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
624         ctx->callback = callback;
625         cbdata.context = static_cast<void*>(ctx);
626         cbdata.cb = &observeResourceCallback;
627         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
628
629         OCMethod method;
630         if (observeType == ObserveType::Observe)
631         {
632             method = OC_REST_OBSERVE;
633         }
634         else if (observeType == ObserveType::ObserveAll)
635         {
636             method = OC_REST_OBSERVE_ALL;
637         }
638         else
639         {
640             method = OC_REST_OBSERVE_ALL;
641         }
642
643         auto cLock = m_csdkLock.lock();
644
645         if(cLock)
646         {
647             std::ostringstream os;
648             os << host << assembleSetResourceUri(uri, queryParams).c_str();
649
650             std::lock_guard<std::recursive_mutex> lock(*cLock);
651             OCHeaderOption options[MAX_HEADER_OPTIONS];
652
653             assembleHeaderOptions(options, headerOptions);
654             result = OCDoResource(handle, method,
655                                   os.str().c_str(), nullptr,
656                                   nullptr, connectivityType,
657                                   static_cast<OCQualityOfService>(QoS),
658                                   &cbdata,
659                                   options, headerOptions.size());
660         }
661         else
662         {
663             delete ctx;
664             return OC_STACK_ERROR;
665         }
666
667         return result;
668     }
669
670     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
671         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
672         QualityOfService QoS)
673     {
674         OCStackResult result;
675         auto cLock = m_csdkLock.lock();
676
677         if(cLock)
678         {
679             std::lock_guard<std::recursive_mutex> lock(*cLock);
680             OCHeaderOption options[MAX_HEADER_OPTIONS];
681
682             assembleHeaderOptions(options, headerOptions);
683             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
684                     headerOptions.size());
685         }
686         else
687         {
688             result = OC_STACK_ERROR;
689         }
690
691         return result;
692     }
693
694     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
695             OCClientResponse* clientResponse)
696     {
697         ostringstream os;
698         uint16_t port;
699         uint8_t a;
700         uint8_t b;
701         uint8_t c;
702         uint8_t d;
703
704         if(OCDevAddrToIPv4Addr(clientResponse->addr, &a, &b, &c, &d) == 0 &&
705                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
706         {
707             os<<static_cast<int>(a)<<"."<<static_cast<int>(b)<<"."<<static_cast<int>(c)
708                     <<"."<<static_cast<int>(d)<<":"<<static_cast<int>(port);
709
710             ClientCallbackContext::SubscribePresenceContext* context =
711                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
712
713             std::thread exec(context->callback, clientResponse->result,
714                     clientResponse->sequenceNumber, os.str());
715
716             exec.detach();
717         }
718         else
719         {
720             oclog() << "subscribePresenceCallback(): OCDevAddrToIPv4Addr() or OCDevAddrToPort() "
721                     <<"failed"<< std::flush;
722         }
723         return OC_STACK_KEEP_TRANSACTION;
724     }
725
726     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
727         const std::string& host, const std::string& resourceType,
728         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
729     {
730         OCCallbackData cbdata = {0};
731
732         ClientCallbackContext::SubscribePresenceContext* ctx =
733             new ClientCallbackContext::SubscribePresenceContext();
734         ctx->callback = presenceHandler;
735         cbdata.cb = &subscribePresenceCallback;
736         cbdata.context = static_cast<void*>(ctx);
737         cbdata.cd = [](void* c)
738             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
739         auto cLock = m_csdkLock.lock();
740
741         std::ostringstream os;
742         os << host << "/oc/presence";
743
744         if(!resourceType.empty())
745         {
746             os << "?rt=" << resourceType;
747         }
748
749         if(!cLock)
750         {
751             delete ctx;
752             return OC_STACK_ERROR;
753         }
754
755         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
756                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
757     }
758
759     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
760     {
761         OCStackResult result;
762         auto cLock = m_csdkLock.lock();
763
764         if(cLock)
765         {
766             std::lock_guard<std::recursive_mutex> lock(*cLock);
767             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
768         }
769         else
770         {
771             result = OC_STACK_ERROR;
772         }
773
774         return result;
775     }
776
777     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
778     {
779         qos = m_cfg.QoS;
780         return OC_STACK_OK;
781     }
782
783     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
784            const HeaderOptions& headerOptions)
785     {
786         int i = 0;
787
788         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
789         {
790             options[i].protocolID = OC_COAP_ID;
791             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
792             options[i].optionLength = (it->getOptionData()).length() + 1;
793             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
794                     (it->getOptionData()).length() + 1);
795             i++;
796         }
797     }
798 }