aa4991b991ac2d1c4679c20eef59aafdfb7b2982
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         // only stop if we are the ones who actually called 'init'.  We are counting
62         // on the server to do the stop.
63         if(m_cfg.mode == ModeType::Client)
64         {
65             OCStop();
66         }
67     }
68
69     void InProcClientWrapper::listeningFunc()
70     {
71         while(m_threadRun)
72         {
73             OCStackResult result;
74             auto cLock = m_csdkLock.lock();
75             if(cLock)
76             {
77                 std::lock_guard<std::recursive_mutex> lock(*cLock);
78                 result = OCProcess();
79             }
80             else
81             {
82                 result = OC_STACK_ERROR;
83             }
84
85             if(result != OC_STACK_OK)
86             {
87                 // TODO: do something with result if failed?
88             }
89
90             // To minimize CPU utilization we may wish to do this with sleep
91             std::this_thread::sleep_for(std::chrono::milliseconds(10));
92         }
93     }
94
95     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
96     {
97         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
98         {
99             return OCRepresentation();
100         }
101
102         MessageContainer oc;
103         try
104         {
105             oc.setJSONRepresentation(clientResponse->resJSONPayload);
106         }
107         catch (cereal::RapidJSONException& ex)
108         {
109             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
112         }
113         catch (cereal::Exception& ex)
114         {
115             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
118         }
119
120         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121         if(it == oc.representations().end())
122         {
123             return OCRepresentation();
124         }
125
126         // first one is considered the root, everything else is considered a child of this one.
127         OCRepresentation root = *it;
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if(clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         auto clientWrapper = context->clientWrapper.lock();
153
154         if(!clientWrapper)
155         {
156             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
157                     << std::flush;
158             return OC_STACK_KEEP_TRANSACTION;
159         }
160
161         std::stringstream requestStream;
162         requestStream << clientResponse->resJSONPayload;
163
164         try
165         {
166
167             ListenOCContainer container(clientWrapper, *clientResponse->addr,
168                     clientResponse->connType, requestStream);
169             // loop to ensure valid construction of all resources
170             for(auto resource : container.Resources())
171             {
172                 std::thread exec(context->callback, resource);
173                 exec.detach();
174             }
175
176         }
177         catch(const std::exception& e)
178         {
179             oclog() << "listenCallback failed to parse a malformed message: "
180                     << e.what()
181                     << std::endl
182                     << clientResponse->resJSONPayload
183                     << std::endl
184                     << clientResponse->result
185                     << std::flush;
186             return OC_STACK_KEEP_TRANSACTION;
187         }
188
189         return OC_STACK_KEEP_TRANSACTION;
190     }
191
192     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193         const std::string& resourceType, OCConnectivityType connectivityType,
194         FindCallback& callback, QualityOfService QoS)
195     {
196         if(!callback)
197         {
198             return OC_STACK_INVALID_PARAM;
199         }
200
201         OCStackResult result;
202
203         ClientCallbackContext::ListenContext* context =
204             new ClientCallbackContext::ListenContext(callback, shared_from_this());
205         OCCallbackData cbdata(
206                 static_cast<void*>(context),
207                 listenCallback,
208                 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
209             );
210
211         auto cLock = m_csdkLock.lock();
212         if(cLock)
213         {
214             std::lock_guard<std::recursive_mutex> lock(*cLock);
215             result = OCDoResource(nullptr, OC_REST_GET,
216                                   resourceType.c_str(),
217                                   nullptr, nullptr, connectivityType,
218                                   static_cast<OCQualityOfService>(QoS),
219                                   &cbdata,
220                                   nullptr, 0);
221         }
222         else
223         {
224             delete context;
225             result = OC_STACK_ERROR;
226         }
227         return result;
228     }
229
230     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
231             OCClientResponse* clientResponse)
232     {
233         ClientCallbackContext::DeviceListenContext* context =
234             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
235
236         try
237         {
238             OCRepresentation rep = parseGetSetCallback(clientResponse);
239             std::thread exec(context->callback, rep);
240             exec.detach();
241         }
242         catch(OC::OCException& e)
243         {
244             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
245                 <<e.what() <<std::flush;
246         }
247
248         return OC_STACK_KEEP_TRANSACTION;
249     }
250
251     OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
252         const std::string& deviceURI, OCConnectivityType connectivityType,
253         FindDeviceCallback& callback, QualityOfService QoS)
254     {
255         if(!callback)
256         {
257             return OC_STACK_INVALID_PARAM;
258         }
259         OCStackResult result;
260
261         ClientCallbackContext::DeviceListenContext* context =
262             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
263         OCCallbackData cbdata(
264                 static_cast<void*>(context),
265                 listenDeviceCallback,
266                 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
267                 );
268
269         auto cLock = m_csdkLock.lock();
270         if(cLock)
271         {
272             std::lock_guard<std::recursive_mutex> lock(*cLock);
273             result = OCDoResource(nullptr, OC_REST_GET,
274                                   deviceURI.c_str(),
275                                   nullptr, nullptr, connectivityType,
276                                   static_cast<OCQualityOfService>(QoS),
277                                   &cbdata,
278                                   nullptr, 0);
279         }
280         else
281         {
282             delete context;
283             result = OC_STACK_ERROR;
284         }
285         return result;
286     }
287
288     void parseServerHeaderOptions(OCClientResponse* clientResponse,
289                     HeaderOptions& serverHeaderOptions)
290     {
291         if(clientResponse)
292         {
293             // Parse header options from server
294             uint16_t optionID;
295             std::string optionData;
296
297             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
298             {
299                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
300                 optionData = reinterpret_cast<const char*>
301                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
302                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
303                 serverHeaderOptions.push_back(headerOption);
304             }
305         }
306         else
307         {
308             // clientResponse is invalid
309             // TODO check proper logging
310             std::cout << " Invalid response " << std::endl;
311         }
312     }
313
314     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
315         OCClientResponse* clientResponse)
316     {
317         ClientCallbackContext::GetContext* context =
318             static_cast<ClientCallbackContext::GetContext*>(ctx);
319
320         OCRepresentation rep;
321         HeaderOptions serverHeaderOptions;
322         OCStackResult result = clientResponse->result;
323         if(result == OC_STACK_OK)
324         {
325             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
326             try
327             {
328                 rep = parseGetSetCallback(clientResponse);
329             }
330             catch(OC::OCException& e)
331             {
332                 result = e.code();
333             }
334         }
335
336         std::thread exec(context->callback, serverHeaderOptions, rep, result);
337         exec.detach();
338         return OC_STACK_DELETE_TRANSACTION;
339     }
340
341     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
342         const std::string& uri, OCConnectivityType connectivityType,
343         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
344         GetCallback& callback, QualityOfService QoS)
345     {
346         if(!callback)
347         {
348             return OC_STACK_INVALID_PARAM;
349         }
350         OCStackResult result;
351         ClientCallbackContext::GetContext* ctx =
352             new ClientCallbackContext::GetContext(callback);
353         OCCallbackData cbdata(
354                 static_cast<void*>(ctx),
355                 getResourceCallback,
356                 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
357                 );
358
359         auto cLock = m_csdkLock.lock();
360
361         if(cLock)
362         {
363             std::ostringstream os;
364             os << host << assembleSetResourceUri(uri, queryParams).c_str();
365
366             std::lock_guard<std::recursive_mutex> lock(*cLock);
367             OCHeaderOption options[MAX_HEADER_OPTIONS];
368
369             result = OCDoResource(nullptr, OC_REST_GET, os.str().c_str(),
370                                   nullptr, nullptr, connectivityType,
371                                   static_cast<OCQualityOfService>(QoS),
372                                   &cbdata,
373                                   assembleHeaderOptions(options, headerOptions),
374                                   headerOptions.size());
375         }
376         else
377         {
378             delete ctx;
379             result = OC_STACK_ERROR;
380         }
381         return result;
382     }
383
384
385     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
386         OCClientResponse* clientResponse)
387     {
388         ClientCallbackContext::SetContext* context =
389             static_cast<ClientCallbackContext::SetContext*>(ctx);
390         OCRepresentation attrs;
391         HeaderOptions serverHeaderOptions;
392
393         OCStackResult result = clientResponse->result;
394         if (OC_STACK_OK               == result ||
395             OC_STACK_RESOURCE_CREATED == result ||
396             OC_STACK_RESOURCE_DELETED == result)
397         {
398             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
399             try
400             {
401                 attrs = parseGetSetCallback(clientResponse);
402             }
403             catch(OC::OCException& e)
404             {
405                 result = e.code();
406             }
407         }
408
409         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
410         exec.detach();
411         return OC_STACK_DELETE_TRANSACTION;
412     }
413
414     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
415         const QueryParamsMap& queryParams)
416     {
417         if(uri.back() == '/')
418         {
419             uri.resize(uri.size()-1);
420         }
421
422         ostringstream paramsList;
423         if(queryParams.size() > 0)
424         {
425             paramsList << '?';
426         }
427
428         for(auto& param : queryParams)
429         {
430             paramsList << param.first <<'='<<param.second<<';';
431         }
432
433         std::string queryString = paramsList.str();
434         if(queryString.back() == ';')
435         {
436             queryString.resize(queryString.size() - 1);
437         }
438
439         std::string ret = uri + queryString;
440         return ret;
441     }
442
443     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
444     {
445         MessageContainer ocInfo;
446         ocInfo.addRepresentation(rep);
447         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
448     }
449
450     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
451         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
452         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
453         PostCallback& callback, QualityOfService QoS)
454     {
455         if(!callback)
456         {
457             return OC_STACK_INVALID_PARAM;
458         }
459         OCStackResult result;
460         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
461         OCCallbackData cbdata(
462                 static_cast<void*>(ctx),
463                 setResourceCallback,
464                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
465                 );
466
467         // TODO: in the future the cstack should be combining these two strings!
468         ostringstream os;
469         os << host << assembleSetResourceUri(uri, queryParams).c_str();
470         // TODO: end of above
471
472         auto cLock = m_csdkLock.lock();
473
474         if(cLock)
475         {
476             std::lock_guard<std::recursive_mutex> lock(*cLock);
477             OCHeaderOption options[MAX_HEADER_OPTIONS];
478
479             result = OCDoResource(nullptr, OC_REST_POST,
480                                   os.str().c_str(), nullptr,
481                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
482                                   static_cast<OCQualityOfService>(QoS),
483                                   &cbdata,
484                                   assembleHeaderOptions(options, headerOptions),
485                                   headerOptions.size());
486         }
487         else
488         {
489             delete ctx;
490             result = OC_STACK_ERROR;
491         }
492
493         return result;
494     }
495
496     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
497         const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
498         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
499         PutCallback& callback, QualityOfService QoS)
500     {
501         if(!callback)
502         {
503             return OC_STACK_INVALID_PARAM;
504         }
505         OCStackResult result;
506         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
507         OCCallbackData cbdata(
508                 static_cast<void*>(ctx),
509                 setResourceCallback,
510                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
511                 );
512
513         // TODO: in the future the cstack should be combining these two strings!
514         ostringstream os;
515         os << host << assembleSetResourceUri(uri, queryParams).c_str();
516         // TODO: end of above
517
518         auto cLock = m_csdkLock.lock();
519
520         if(cLock)
521         {
522             std::lock_guard<std::recursive_mutex> lock(*cLock);
523             OCDoHandle handle;
524             OCHeaderOption options[MAX_HEADER_OPTIONS];
525
526             result = OCDoResource(&handle, OC_REST_PUT,
527                                   os.str().c_str(), nullptr,
528                                   assembleSetResourcePayload(rep).c_str(), connectivityType,
529                                   static_cast<OCQualityOfService>(QoS),
530                                   &cbdata,
531                                   assembleHeaderOptions(options, headerOptions),
532                                   headerOptions.size());
533         }
534         else
535         {
536             delete ctx;
537             result = OC_STACK_ERROR;
538         }
539
540         return result;
541     }
542
543     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
544         OCClientResponse* clientResponse)
545     {
546         ClientCallbackContext::DeleteContext* context =
547             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
548         HeaderOptions serverHeaderOptions;
549
550         if(clientResponse->result == OC_STACK_OK)
551         {
552             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
553         }
554         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
555         exec.detach();
556         return OC_STACK_DELETE_TRANSACTION;
557     }
558
559     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
560         const std::string& uri, OCConnectivityType connectivityType,
561         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
562     {
563         if(!callback)
564         {
565             return OC_STACK_INVALID_PARAM;
566         }
567         OCStackResult result;
568         ClientCallbackContext::DeleteContext* ctx =
569             new ClientCallbackContext::DeleteContext(callback);
570         OCCallbackData cbdata(
571                 static_cast<void*>(ctx),
572                 deleteResourceCallback,
573                 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
574                 );
575
576         ostringstream os;
577         os << host << uri;
578
579         auto cLock = m_csdkLock.lock();
580
581         if(cLock)
582         {
583             OCHeaderOption options[MAX_HEADER_OPTIONS];
584
585             std::lock_guard<std::recursive_mutex> lock(*cLock);
586
587             result = OCDoResource(nullptr, OC_REST_DELETE,
588                                   os.str().c_str(), nullptr,
589                                   nullptr, connectivityType,
590                                   static_cast<OCQualityOfService>(m_cfg.QoS),
591                                   &cbdata,
592                                   assembleHeaderOptions(options, headerOptions),
593                                   headerOptions.size());
594         }
595         else
596         {
597             delete ctx;
598             result = OC_STACK_ERROR;
599         }
600
601         return result;
602     }
603
604     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
605         OCClientResponse* clientResponse)
606     {
607         ClientCallbackContext::ObserveContext* context =
608             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
609         OCRepresentation attrs;
610         HeaderOptions serverHeaderOptions;
611         uint32_t sequenceNumber = clientResponse->sequenceNumber;
612         OCStackResult result = clientResponse->result;
613         if(clientResponse->result == OC_STACK_OK)
614         {
615             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
616             try
617             {
618                 attrs = parseGetSetCallback(clientResponse);
619             }
620             catch(OC::OCException& e)
621             {
622                 result = e.code();
623             }
624         }
625         std::thread exec(context->callback, serverHeaderOptions, attrs,
626                     result, sequenceNumber);
627         exec.detach();
628         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
629         {
630             return OC_STACK_DELETE_TRANSACTION;
631         }
632         return OC_STACK_KEEP_TRANSACTION;
633     }
634
635     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
636         const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
637         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
638         ObserveCallback& callback, QualityOfService QoS)
639     {
640         if(!callback)
641         {
642             return OC_STACK_INVALID_PARAM;
643         }
644         OCStackResult result;
645
646         ClientCallbackContext::ObserveContext* ctx =
647             new ClientCallbackContext::ObserveContext(callback);
648         OCCallbackData cbdata(
649                 static_cast<void*>(ctx),
650                 observeResourceCallback,
651                 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
652                 );
653
654         OCMethod method;
655         if (observeType == ObserveType::Observe)
656         {
657             method = OC_REST_OBSERVE;
658         }
659         else if (observeType == ObserveType::ObserveAll)
660         {
661             method = OC_REST_OBSERVE_ALL;
662         }
663         else
664         {
665             method = OC_REST_OBSERVE_ALL;
666         }
667
668         auto cLock = m_csdkLock.lock();
669
670         if(cLock)
671         {
672             std::ostringstream os;
673             os << host << assembleSetResourceUri(uri, queryParams).c_str();
674
675             std::lock_guard<std::recursive_mutex> lock(*cLock);
676             OCHeaderOption options[MAX_HEADER_OPTIONS];
677
678             result = OCDoResource(handle, method,
679                                   os.str().c_str(), nullptr,
680                                   nullptr, connectivityType,
681                                   static_cast<OCQualityOfService>(QoS),
682                                   &cbdata,
683                                   assembleHeaderOptions(options, headerOptions),
684                                   headerOptions.size());
685         }
686         else
687         {
688             delete ctx;
689             return OC_STACK_ERROR;
690         }
691
692         return result;
693     }
694
695     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
696         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
697         QualityOfService QoS)
698     {
699         OCStackResult result;
700         auto cLock = m_csdkLock.lock();
701
702         if(cLock)
703         {
704             std::lock_guard<std::recursive_mutex> lock(*cLock);
705             OCHeaderOption options[MAX_HEADER_OPTIONS];
706
707             result = OCCancel(handle,
708                     static_cast<OCQualityOfService>(QoS),
709                     assembleHeaderOptions(options, headerOptions),
710                     headerOptions.size());
711         }
712         else
713         {
714             result = OC_STACK_ERROR;
715         }
716
717         return result;
718     }
719
720     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
721             OCClientResponse* clientResponse)
722     {
723         ostringstream os;
724         uint16_t port;
725         uint8_t a;
726         uint8_t b;
727         uint8_t c;
728         uint8_t d;
729
730         if(OCDevAddrToIPv4Addr(clientResponse->addr, &a, &b, &c, &d) == 0 &&
731                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
732         {
733             os<<static_cast<int>(a)<<"."<<static_cast<int>(b)<<"."<<static_cast<int>(c)
734                     <<"."<<static_cast<int>(d)<<":"<<static_cast<int>(port);
735
736             ClientCallbackContext::SubscribePresenceContext* context =
737                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
738
739             std::thread exec(context->callback, clientResponse->result,
740                     clientResponse->sequenceNumber, os.str());
741
742             exec.detach();
743         }
744         else
745         {
746             oclog() << "subscribePresenceCallback(): OCDevAddrToIPv4Addr() or OCDevAddrToPort() "
747                     <<"failed"<< std::flush;
748         }
749         return OC_STACK_KEEP_TRANSACTION;
750     }
751
752     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
753         const std::string& host, const std::string& resourceType,
754         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
755     {
756         if(!presenceHandler)
757         {
758             return OC_STACK_INVALID_PARAM;
759         }
760
761         ClientCallbackContext::SubscribePresenceContext* ctx =
762             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
763         OCCallbackData cbdata(
764                 static_cast<void*>(ctx),
765                 subscribePresenceCallback,
766                 [](void* c)
767                 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
768                 );
769
770         auto cLock = m_csdkLock.lock();
771
772         std::ostringstream os;
773         os << host << OC_PRESENCE_URI;
774
775         if(!resourceType.empty())
776         {
777             os << "?rt=" << resourceType;
778         }
779
780         if(!cLock)
781         {
782             delete ctx;
783             return OC_STACK_ERROR;
784         }
785
786         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
787                             connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
788     }
789
790     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
791     {
792         OCStackResult result;
793         auto cLock = m_csdkLock.lock();
794
795         if(cLock)
796         {
797             std::lock_guard<std::recursive_mutex> lock(*cLock);
798             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
799         }
800         else
801         {
802             result = OC_STACK_ERROR;
803         }
804
805         return result;
806     }
807
808     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
809     {
810         qos = m_cfg.QoS;
811         return OC_STACK_OK;
812     }
813
814     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
815            const HeaderOptions& headerOptions)
816     {
817         int i = 0;
818
819         if( headerOptions.size() == 0)
820         {
821             return nullptr;
822         }
823
824         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
825         {
826             options[i] = OCHeaderOption(OC_COAP_ID,
827                     it->getOptionID(),
828                     it->getOptionData().length() + 1,
829                     reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
830             i++;
831         }
832
833         return options;
834     }
835 }