Merge branch 'master' into resource-manipulation
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCTransportFlags serverFlags =
42                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43             OCTransportFlags clientFlags =
44                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
46
47             if(OC_STACK_OK != result)
48             {
49                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
50             }
51
52             m_threadRun = true;
53             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
54         }
55     }
56
57     InProcClientWrapper::~InProcClientWrapper()
58     {
59         if(m_threadRun && m_listeningThread.joinable())
60         {
61             m_threadRun = false;
62             m_listeningThread.join();
63         }
64
65         // only stop if we are the ones who actually called 'init'.  We are counting
66         // on the server to do the stop.
67         if(m_cfg.mode == ModeType::Client)
68         {
69             OCStop();
70         }
71     }
72
73     void InProcClientWrapper::listeningFunc()
74     {
75         while(m_threadRun)
76         {
77             OCStackResult result;
78             auto cLock = m_csdkLock.lock();
79             if(cLock)
80             {
81                 std::lock_guard<std::recursive_mutex> lock(*cLock);
82                 result = OCProcess();
83             }
84             else
85             {
86                 result = OC_STACK_ERROR;
87             }
88
89             if(result != OC_STACK_OK)
90             {
91                 // TODO: do something with result if failed?
92             }
93
94             // To minimize CPU utilization we may wish to do this with sleep
95             std::this_thread::sleep_for(std::chrono::milliseconds(10));
96         }
97     }
98
99     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
100     {
101         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
102         {
103             return OCRepresentation();
104         }
105
106         MessageContainer oc;
107         try
108         {
109             oc.setJSONRepresentation(clientResponse->resJSONPayload);
110         }
111         catch (cereal::RapidJSONException& ex)
112         {
113             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
116         }
117         catch (cereal::Exception& ex)
118         {
119             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
122         }
123
124         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125         if(it == oc.representations().end())
126         {
127             return OCRepresentation();
128         }
129
130         // first one is considered the root, everything else is considered a child of this one.
131         OCRepresentation root = *it;
132         ++it;
133
134         std::for_each(it, oc.representations().end(),
135                 [&root](const OCRepresentation& repItr)
136                 {root.addChild(repItr);});
137         return root;
138
139     }
140
141     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142         OCClientResponse* clientResponse)
143     {
144         ClientCallbackContext::ListenContext* context =
145             static_cast<ClientCallbackContext::ListenContext*>(ctx);
146
147         if(clientResponse->result != OC_STACK_OK)
148         {
149             oclog() << "listenCallback(): failed to create resource. clientResponse: "
150                     << clientResponse->result
151                     << std::flush;
152
153             return OC_STACK_KEEP_TRANSACTION;
154         }
155
156         auto clientWrapper = context->clientWrapper.lock();
157
158         if(!clientWrapper)
159         {
160             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
161                     << std::flush;
162             return OC_STACK_KEEP_TRANSACTION;
163         }
164
165         std::stringstream requestStream;
166         requestStream << clientResponse->resJSONPayload;
167
168         try
169         {
170
171             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
172                                     requestStream);
173             // loop to ensure valid construction of all resources
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179
180         }
181         catch(const std::exception& e)
182         {
183             oclog() << "listenCallback failed to parse a malformed message: "
184                     << e.what()
185                     << std::endl
186                     << clientResponse->resJSONPayload
187                     << std::endl
188                     << clientResponse->result
189                     << std::flush;
190             return OC_STACK_KEEP_TRANSACTION;
191         }
192
193         return OC_STACK_KEEP_TRANSACTION;
194     }
195
196     OCStackResult InProcClientWrapper::ListenForResource(
197             const std::string& serviceUrl,  // unused
198             const std::string& resourceType,
199             OCConnectivityType connectivityType,
200             FindCallback& callback, QualityOfService QoS)
201     {
202         if(!callback)
203         {
204             return OC_STACK_INVALID_PARAM;
205         }
206
207         OCStackResult result;
208
209         ClientCallbackContext::ListenContext* context =
210             new ClientCallbackContext::ListenContext(callback, shared_from_this());
211         OCCallbackData cbdata(
212                 static_cast<void*>(context),
213                 listenCallback,
214                 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
215             );
216
217         auto cLock = m_csdkLock.lock();
218         if(cLock)
219         {
220             std::lock_guard<std::recursive_mutex> lock(*cLock);
221             result = OCDoResource(nullptr, OC_REST_DISCOVER,
222                                   resourceType.c_str(),
223                                   nullptr, nullptr, connectivityType,
224                                   static_cast<OCQualityOfService>(QoS),
225                                   &cbdata,
226                                   nullptr, 0);
227         }
228         else
229         {
230             delete context;
231             result = OC_STACK_ERROR;
232         }
233         return result;
234     }
235
236     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237             OCClientResponse* clientResponse)
238     {
239         ClientCallbackContext::DeviceListenContext* context =
240             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
241
242         try
243         {
244             OCRepresentation rep = parseGetSetCallback(clientResponse);
245             std::thread exec(context->callback, rep);
246             exec.detach();
247         }
248         catch(OC::OCException& e)
249         {
250             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251                 <<e.what() <<std::flush;
252         }
253
254         return OC_STACK_KEEP_TRANSACTION;
255     }
256
257     OCStackResult InProcClientWrapper::ListenForDevice(
258             const std::string& serviceUrl,  // unused
259             const std::string& deviceURI,
260             OCConnectivityType connectivityType,
261             FindDeviceCallback& callback,
262             QualityOfService QoS)
263     {
264         if(!callback)
265         {
266             return OC_STACK_INVALID_PARAM;
267         }
268         OCStackResult result;
269
270         ClientCallbackContext::DeviceListenContext* context =
271             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272         OCCallbackData cbdata(
273                 static_cast<void*>(context),
274                 listenDeviceCallback,
275                 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
276                 );
277
278         auto cLock = m_csdkLock.lock();
279         if(cLock)
280         {
281             std::lock_guard<std::recursive_mutex> lock(*cLock);
282             result = OCDoResource(nullptr, OC_REST_DISCOVER,
283                                   deviceURI.c_str(),
284                                   nullptr, nullptr, connectivityType,
285                                   static_cast<OCQualityOfService>(QoS),
286                                   &cbdata,
287                                   nullptr, 0);
288         }
289         else
290         {
291             delete context;
292             result = OC_STACK_ERROR;
293         }
294         return result;
295     }
296
297     void parseServerHeaderOptions(OCClientResponse* clientResponse,
298                     HeaderOptions& serverHeaderOptions)
299     {
300         if(clientResponse)
301         {
302             // Parse header options from server
303             uint16_t optionID;
304             std::string optionData;
305
306             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
307             {
308                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309                 optionData = reinterpret_cast<const char*>
310                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312                 serverHeaderOptions.push_back(headerOption);
313             }
314         }
315         else
316         {
317             // clientResponse is invalid
318             // TODO check proper logging
319             std::cout << " Invalid response " << std::endl;
320         }
321     }
322
323     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324         OCClientResponse* clientResponse)
325     {
326         ClientCallbackContext::GetContext* context =
327             static_cast<ClientCallbackContext::GetContext*>(ctx);
328
329         OCRepresentation rep;
330         HeaderOptions serverHeaderOptions;
331         OCStackResult result = clientResponse->result;
332         if(result == OC_STACK_OK)
333         {
334             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
335             try
336             {
337                 rep = parseGetSetCallback(clientResponse);
338             }
339             catch(OC::OCException& e)
340             {
341                 result = e.code();
342             }
343         }
344
345         std::thread exec(context->callback, serverHeaderOptions, rep, result);
346         exec.detach();
347         return OC_STACK_DELETE_TRANSACTION;
348     }
349
350     OCStackResult InProcClientWrapper::GetResourceRepresentation(
351         const OCDevAddr& devAddr,
352         const std::string& resourceUri,
353         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
354         GetCallback& callback, QualityOfService QoS)
355     {
356         if(!callback)
357         {
358             return OC_STACK_INVALID_PARAM;
359         }
360         OCStackResult result;
361         ClientCallbackContext::GetContext* ctx =
362             new ClientCallbackContext::GetContext(callback);
363         OCCallbackData cbdata(
364                 static_cast<void*>(ctx),
365                 getResourceCallback,
366                 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
367                 );
368
369         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
370
371         auto cLock = m_csdkLock.lock();
372
373         if(cLock)
374         {
375             std::lock_guard<std::recursive_mutex> lock(*cLock);
376             OCHeaderOption options[MAX_HEADER_OPTIONS];
377
378             result = OCDoResource(nullptr, OC_REST_GET,
379                                   uri.c_str(),
380                                   &devAddr, nullptr,
381                                   CT_DEFAULT,
382                                   static_cast<OCQualityOfService>(QoS),
383                                   &cbdata,
384                                   assembleHeaderOptions(options, headerOptions),
385                                   headerOptions.size());
386         }
387         else
388         {
389             delete ctx;
390             result = OC_STACK_ERROR;
391         }
392         return result;
393     }
394
395
396     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
397         OCClientResponse* clientResponse)
398     {
399         ClientCallbackContext::SetContext* context =
400             static_cast<ClientCallbackContext::SetContext*>(ctx);
401         OCRepresentation attrs;
402         HeaderOptions serverHeaderOptions;
403
404         OCStackResult result = clientResponse->result;
405         if (OC_STACK_OK               == result ||
406             OC_STACK_RESOURCE_CREATED == result ||
407             OC_STACK_RESOURCE_DELETED == result)
408         {
409             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
410             try
411             {
412                 attrs = parseGetSetCallback(clientResponse);
413             }
414             catch(OC::OCException& e)
415             {
416                 result = e.code();
417             }
418         }
419
420         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
421         exec.detach();
422         return OC_STACK_DELETE_TRANSACTION;
423     }
424
425     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
426         const QueryParamsMap& queryParams)
427     {
428         if(uri.back() == '/')
429         {
430             uri.resize(uri.size()-1);
431         }
432
433         ostringstream paramsList;
434         if(queryParams.size() > 0)
435         {
436             paramsList << '?';
437         }
438
439         for(auto& param : queryParams)
440         {
441             paramsList << param.first <<'='<<param.second<<';';
442         }
443
444         std::string queryString = paramsList.str();
445         if(queryString.back() == ';')
446         {
447             queryString.resize(queryString.size() - 1);
448         }
449
450         std::string ret = uri + queryString;
451         return ret;
452     }
453
454     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
455     {
456         MessageContainer ocInfo;
457         ocInfo.addRepresentation(rep);
458         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
459     }
460
461     OCStackResult InProcClientWrapper::PostResourceRepresentation(
462         const OCDevAddr& devAddr,
463         const std::string& uri,
464         const OCRepresentation& rep,
465         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
466         PostCallback& callback, QualityOfService QoS)
467     {
468         if(!callback)
469         {
470             return OC_STACK_INVALID_PARAM;
471         }
472         OCStackResult result;
473         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
474         OCCallbackData cbdata(
475                 static_cast<void*>(ctx),
476                 setResourceCallback,
477                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
478                 );
479
480         std::string url = assembleSetResourceUri(uri, queryParams);
481
482         auto cLock = m_csdkLock.lock();
483
484         if(cLock)
485         {
486             std::lock_guard<std::recursive_mutex> lock(*cLock);
487             OCHeaderOption options[MAX_HEADER_OPTIONS];
488
489             result = OCDoResource(nullptr, OC_REST_POST,
490                                   url.c_str(), &devAddr,
491                                   assembleSetResourcePayload(rep).c_str(),
492                                   CT_DEFAULT,
493                                   static_cast<OCQualityOfService>(QoS),
494                                   &cbdata,
495                                   assembleHeaderOptions(options, headerOptions),
496                                   headerOptions.size());
497         }
498         else
499         {
500             delete ctx;
501             result = OC_STACK_ERROR;
502         }
503
504         return result;
505     }
506
507     OCStackResult InProcClientWrapper::PutResourceRepresentation(
508         const OCDevAddr& devAddr,
509         const std::string& uri,
510         const OCRepresentation& rep,
511         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
512         PutCallback& callback, QualityOfService QoS)
513     {
514         if(!callback)
515         {
516             return OC_STACK_INVALID_PARAM;
517         }
518         OCStackResult result;
519         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
520         OCCallbackData cbdata(
521                 static_cast<void*>(ctx),
522                 setResourceCallback,
523                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
524                 );
525
526         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
527
528         auto cLock = m_csdkLock.lock();
529
530         if(cLock)
531         {
532             std::lock_guard<std::recursive_mutex> lock(*cLock);
533             OCDoHandle handle;
534             OCHeaderOption options[MAX_HEADER_OPTIONS];
535
536             result = OCDoResource(&handle, OC_REST_PUT,
537                                   url.c_str(), &devAddr,
538                                   assembleSetResourcePayload(rep).c_str(),
539                                   CT_DEFAULT,
540                                   static_cast<OCQualityOfService>(QoS),
541                                   &cbdata,
542                                   assembleHeaderOptions(options, headerOptions),
543                                   headerOptions.size());
544         }
545         else
546         {
547             delete ctx;
548             result = OC_STACK_ERROR;
549         }
550
551         return result;
552     }
553
554     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
555         OCClientResponse* clientResponse)
556     {
557         ClientCallbackContext::DeleteContext* context =
558             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
559         HeaderOptions serverHeaderOptions;
560
561         if(clientResponse->result == OC_STACK_OK)
562         {
563             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
564         }
565         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
566         exec.detach();
567         return OC_STACK_DELETE_TRANSACTION;
568     }
569
570     OCStackResult InProcClientWrapper::DeleteResource(
571         const OCDevAddr& devAddr,
572         const std::string& uri,
573         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
574     {
575         if(!callback)
576         {
577             return OC_STACK_INVALID_PARAM;
578         }
579         OCStackResult result;
580         ClientCallbackContext::DeleteContext* ctx =
581             new ClientCallbackContext::DeleteContext(callback);
582         OCCallbackData cbdata(
583                 static_cast<void*>(ctx),
584                 deleteResourceCallback,
585                 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
586                 );
587
588         auto cLock = m_csdkLock.lock();
589
590         if(cLock)
591         {
592             OCHeaderOption options[MAX_HEADER_OPTIONS];
593
594             std::lock_guard<std::recursive_mutex> lock(*cLock);
595
596             result = OCDoResource(nullptr, OC_REST_DELETE,
597                                   uri.c_str(), &devAddr,
598                                   nullptr,
599                                   CT_DEFAULT,
600                                   static_cast<OCQualityOfService>(m_cfg.QoS),
601                                   &cbdata,
602                                   assembleHeaderOptions(options, headerOptions),
603                                   headerOptions.size());
604         }
605         else
606         {
607             delete ctx;
608             result = OC_STACK_ERROR;
609         }
610
611         return result;
612     }
613
614     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
615         OCClientResponse* clientResponse)
616     {
617         ClientCallbackContext::ObserveContext* context =
618             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
619         OCRepresentation attrs;
620         HeaderOptions serverHeaderOptions;
621         uint32_t sequenceNumber = clientResponse->sequenceNumber;
622         OCStackResult result = clientResponse->result;
623         if(clientResponse->result == OC_STACK_OK)
624         {
625             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
626             try
627             {
628                 attrs = parseGetSetCallback(clientResponse);
629             }
630             catch(OC::OCException& e)
631             {
632                 result = e.code();
633             }
634         }
635         std::thread exec(context->callback, serverHeaderOptions, attrs,
636                     result, sequenceNumber);
637         exec.detach();
638         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
639         {
640             return OC_STACK_DELETE_TRANSACTION;
641         }
642         return OC_STACK_KEEP_TRANSACTION;
643     }
644
645     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
646         const OCDevAddr& devAddr,
647         const std::string& uri,
648         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
649         ObserveCallback& callback, QualityOfService QoS)
650     {
651         if(!callback)
652         {
653             return OC_STACK_INVALID_PARAM;
654         }
655         OCStackResult result;
656
657         ClientCallbackContext::ObserveContext* ctx =
658             new ClientCallbackContext::ObserveContext(callback);
659         OCCallbackData cbdata(
660                 static_cast<void*>(ctx),
661                 observeResourceCallback,
662                 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
663                 );
664
665         OCMethod method;
666         if (observeType == ObserveType::Observe)
667         {
668             method = OC_REST_OBSERVE;
669         }
670         else if (observeType == ObserveType::ObserveAll)
671         {
672             method = OC_REST_OBSERVE_ALL;
673         }
674         else
675         {
676             method = OC_REST_OBSERVE_ALL;
677         }
678
679         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
680
681         auto cLock = m_csdkLock.lock();
682
683         if(cLock)
684         {
685             std::lock_guard<std::recursive_mutex> lock(*cLock);
686             OCHeaderOption options[MAX_HEADER_OPTIONS];
687
688             result = OCDoResource(handle, method,
689                                   url.c_str(), &devAddr,
690                                   nullptr,
691                                   CT_DEFAULT,
692                                   static_cast<OCQualityOfService>(QoS),
693                                   &cbdata,
694                                   assembleHeaderOptions(options, headerOptions),
695                                   headerOptions.size());
696         }
697         else
698         {
699             delete ctx;
700             return OC_STACK_ERROR;
701         }
702
703         return result;
704     }
705
706     OCStackResult InProcClientWrapper::CancelObserveResource(
707             OCDoHandle handle,
708             const std::string& host, // unused
709             const std::string& uri,  // unused
710             const HeaderOptions& headerOptions,
711             QualityOfService QoS)
712     {
713         OCStackResult result;
714         auto cLock = m_csdkLock.lock();
715
716         if(cLock)
717         {
718             std::lock_guard<std::recursive_mutex> lock(*cLock);
719             OCHeaderOption options[MAX_HEADER_OPTIONS];
720
721             result = OCCancel(handle,
722                     static_cast<OCQualityOfService>(QoS),
723                     assembleHeaderOptions(options, headerOptions),
724                     headerOptions.size());
725         }
726         else
727         {
728             result = OC_STACK_ERROR;
729         }
730
731         return result;
732     }
733
734     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
735             OCClientResponse* clientResponse)
736     {
737         ClientCallbackContext::SubscribePresenceContext* context =
738         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
739
740         /*
741          * This a hack while we rethink presence subscription.
742          */
743         std::string url = clientResponse->devAddr.addr;
744
745         std::thread exec(context->callback, clientResponse->result,
746                     clientResponse->sequenceNumber, url);
747
748         exec.detach();
749
750         return OC_STACK_KEEP_TRANSACTION;
751     }
752
753     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
754         const std::string& host, const std::string& resourceType,
755         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
756     {
757         if(!presenceHandler)
758         {
759             return OC_STACK_INVALID_PARAM;
760         }
761
762         ClientCallbackContext::SubscribePresenceContext* ctx =
763             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
764         OCCallbackData cbdata(
765                 static_cast<void*>(ctx),
766                 subscribePresenceCallback,
767                 [](void* c)
768                 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
769                 );
770
771         auto cLock = m_csdkLock.lock();
772
773         std::ostringstream os;
774         os << host << "/oc/presence";
775
776         if(!resourceType.empty())
777         {
778             os << "?rt=" << resourceType;
779         }
780
781         if(!cLock)
782         {
783             delete ctx;
784             return OC_STACK_ERROR;
785         }
786
787         return OCDoResource(handle, OC_REST_PRESENCE,
788                             os.str().c_str(), nullptr,
789                             nullptr, connectivityType,
790                             OC_LOW_QOS, &cbdata, NULL, 0);
791     }
792
793     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
794     {
795         OCStackResult result;
796         auto cLock = m_csdkLock.lock();
797
798         if(cLock)
799         {
800             std::lock_guard<std::recursive_mutex> lock(*cLock);
801             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
802         }
803         else
804         {
805             result = OC_STACK_ERROR;
806         }
807
808         return result;
809     }
810
811     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
812     {
813         qos = m_cfg.QoS;
814         return OC_STACK_OK;
815     }
816
817     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
818            const HeaderOptions& headerOptions)
819     {
820         int i = 0;
821
822         if( headerOptions.size() == 0)
823         {
824             return nullptr;
825         }
826
827         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
828         {
829             options[i] = OCHeaderOption(OC_COAP_ID,
830                     it->getOptionID(),
831                     it->getOptionData().length() + 1,
832                     reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
833             i++;
834         }
835
836         return options;
837     }
838 }