737be6361b128379297ad9c06caddf0521a17b1b
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCTransportFlags serverFlags =
42                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43             OCTransportFlags clientFlags =
44                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
46
47             if(OC_STACK_OK != result)
48             {
49                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
50             }
51
52             m_threadRun = true;
53             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
54         }
55     }
56
57     InProcClientWrapper::~InProcClientWrapper()
58     {
59         if(m_threadRun && m_listeningThread.joinable())
60         {
61             m_threadRun = false;
62             m_listeningThread.join();
63         }
64
65         // only stop if we are the ones who actually called 'init'.  We are counting
66         // on the server to do the stop.
67         if(m_cfg.mode == ModeType::Client)
68         {
69             OCStop();
70         }
71     }
72
73     void InProcClientWrapper::listeningFunc()
74     {
75         while(m_threadRun)
76         {
77             OCStackResult result;
78             auto cLock = m_csdkLock.lock();
79             if(cLock)
80             {
81                 std::lock_guard<std::recursive_mutex> lock(*cLock);
82                 result = OCProcess();
83             }
84             else
85             {
86                 result = OC_STACK_ERROR;
87             }
88
89             if(result != OC_STACK_OK)
90             {
91                 // TODO: do something with result if failed?
92             }
93
94             // To minimize CPU utilization we may wish to do this with sleep
95             std::this_thread::sleep_for(std::chrono::milliseconds(10));
96         }
97     }
98
99     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
100     {
101         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
102         {
103             return OCRepresentation();
104         }
105
106         MessageContainer oc;
107         try
108         {
109             oc.setJSONRepresentation(clientResponse->resJSONPayload);
110         }
111         catch (cereal::RapidJSONException& ex)
112         {
113             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
116         }
117         catch (cereal::Exception& ex)
118         {
119             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
122         }
123
124         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125         if(it == oc.representations().end())
126         {
127             return OCRepresentation();
128         }
129
130         // first one is considered the root, everything else is considered a child of this one.
131         OCRepresentation root = *it;
132         ++it;
133
134         std::for_each(it, oc.representations().end(),
135                 [&root](const OCRepresentation& repItr)
136                 {root.addChild(repItr);});
137         return root;
138
139     }
140
141     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142         OCClientResponse* clientResponse)
143     {
144         ClientCallbackContext::ListenContext* context =
145             static_cast<ClientCallbackContext::ListenContext*>(ctx);
146
147         if(clientResponse->result != OC_STACK_OK)
148         {
149             oclog() << "listenCallback(): failed to create resource. clientResponse: "
150                     << clientResponse->result
151                     << std::flush;
152
153             return OC_STACK_KEEP_TRANSACTION;
154         }
155
156         auto clientWrapper = context->clientWrapper.lock();
157
158         if(!clientWrapper)
159         {
160             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
161                     << std::flush;
162             return OC_STACK_KEEP_TRANSACTION;
163         }
164
165         std::stringstream requestStream;
166         requestStream << clientResponse->resJSONPayload;
167
168         try
169         {
170
171             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
172                                     requestStream);
173             // loop to ensure valid construction of all resources
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179
180         }
181         catch(const std::exception& e)
182         {
183             oclog() << "listenCallback failed to parse a malformed message: "
184                     << e.what()
185                     << std::endl
186                     << clientResponse->resJSONPayload
187                     << std::endl
188                     << clientResponse->result
189                     << std::flush;
190             return OC_STACK_KEEP_TRANSACTION;
191         }
192
193         return OC_STACK_KEEP_TRANSACTION;
194     }
195
196     OCStackResult InProcClientWrapper::ListenForResource(
197             const std::string& serviceUrl,  // unused
198             const std::string& resourceType,
199             OCConnectivityType connectivityType,
200             FindCallback& callback, QualityOfService QoS)
201     {
202         if(!callback)
203         {
204             return OC_STACK_INVALID_PARAM;
205         }
206
207         OCStackResult result;
208
209         ClientCallbackContext::ListenContext* context =
210             new ClientCallbackContext::ListenContext(callback, shared_from_this());
211         OCCallbackData cbdata(
212                 static_cast<void*>(context),
213                 listenCallback,
214                 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
215             );
216
217         auto cLock = m_csdkLock.lock();
218         if(cLock)
219         {
220             std::lock_guard<std::recursive_mutex> lock(*cLock);
221             result = OCDoResource(nullptr, OC_REST_DISCOVER,
222                                   resourceType.c_str(),
223                                   nullptr, nullptr, connectivityType,
224                                   static_cast<OCQualityOfService>(QoS),
225                                   &cbdata,
226                                   nullptr, 0);
227         }
228         else
229         {
230             delete context;
231             result = OC_STACK_ERROR;
232         }
233         return result;
234     }
235
236     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237             OCClientResponse* clientResponse)
238     {
239         ClientCallbackContext::DeviceListenContext* context =
240             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
241
242         try
243         {
244             OCRepresentation rep = parseGetSetCallback(clientResponse);
245             std::thread exec(context->callback, rep);
246             exec.detach();
247         }
248         catch(OC::OCException& e)
249         {
250             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251                 <<e.what() <<std::flush;
252         }
253
254         return OC_STACK_KEEP_TRANSACTION;
255     }
256
257     OCStackResult InProcClientWrapper::ListenForDevice(
258             const std::string& serviceUrl,  // unused
259             const std::string& deviceURI,
260             OCConnectivityType connectivityType,
261             FindDeviceCallback& callback,
262             QualityOfService QoS)
263     {
264         if(!callback)
265         {
266             return OC_STACK_INVALID_PARAM;
267         }
268         OCStackResult result;
269
270         ClientCallbackContext::DeviceListenContext* context =
271             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272         OCCallbackData cbdata(
273                 static_cast<void*>(context),
274                 listenDeviceCallback,
275                 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
276                 );
277
278         auto cLock = m_csdkLock.lock();
279         if(cLock)
280         {
281             std::lock_guard<std::recursive_mutex> lock(*cLock);
282             result = OCDoResource(nullptr, OC_REST_DISCOVER,
283                                   deviceURI.c_str(),
284                                   nullptr, nullptr, connectivityType,
285                                   static_cast<OCQualityOfService>(QoS),
286                                   &cbdata,
287                                   nullptr, 0);
288         }
289         else
290         {
291             delete context;
292             result = OC_STACK_ERROR;
293         }
294         return result;
295     }
296
297     void parseServerHeaderOptions(OCClientResponse* clientResponse,
298                     HeaderOptions& serverHeaderOptions)
299     {
300         if(clientResponse)
301         {
302             // Parse header options from server
303             uint16_t optionID;
304             std::string optionData;
305
306             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
307             {
308                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309                 optionData = reinterpret_cast<const char*>
310                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312                 serverHeaderOptions.push_back(headerOption);
313             }
314         }
315         else
316         {
317             // clientResponse is invalid
318             // TODO check proper logging
319             std::cout << " Invalid response " << std::endl;
320         }
321     }
322
323     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324         OCClientResponse* clientResponse)
325     {
326         ClientCallbackContext::GetContext* context =
327             static_cast<ClientCallbackContext::GetContext*>(ctx);
328
329         OCRepresentation rep;
330         HeaderOptions serverHeaderOptions;
331         OCStackResult result = clientResponse->result;
332         if(result == OC_STACK_OK)
333         {
334             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
335             try
336             {
337                 rep = parseGetSetCallback(clientResponse);
338             }
339             catch(OC::OCException& e)
340             {
341                 result = e.code();
342             }
343         }
344
345         std::thread exec(context->callback, serverHeaderOptions, rep, result);
346         exec.detach();
347         return OC_STACK_DELETE_TRANSACTION;
348     }
349
350     OCStackResult InProcClientWrapper::GetResourceRepresentation(
351         const OCDevAddr& devAddr,
352         const std::string& resourceUri,
353         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
354         GetCallback& callback, QualityOfService QoS)
355     {
356         if(!callback)
357         {
358             return OC_STACK_INVALID_PARAM;
359         }
360         OCStackResult result;
361         ClientCallbackContext::GetContext* ctx =
362             new ClientCallbackContext::GetContext(callback);
363         OCCallbackData cbdata(
364                 static_cast<void*>(ctx),
365                 getResourceCallback,
366                 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
367                 );
368
369         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
370
371         auto cLock = m_csdkLock.lock();
372
373         if(cLock)
374         {
375             std::lock_guard<std::recursive_mutex> lock(*cLock);
376             OCHeaderOption options[MAX_HEADER_OPTIONS];
377
378             result = OCDoResource(
379                                   nullptr, OC_REST_GET,
380                                   uri.c_str(),
381                                   &devAddr, nullptr,
382                                   CT_DEFAULT,
383                                   static_cast<OCQualityOfService>(QoS),
384                                   &cbdata,
385                                   assembleHeaderOptions(options, headerOptions),
386                                   headerOptions.size());
387         }
388         else
389         {
390             delete ctx;
391             result = OC_STACK_ERROR;
392         }
393         return result;
394     }
395
396
397     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
398         OCClientResponse* clientResponse)
399     {
400         ClientCallbackContext::SetContext* context =
401             static_cast<ClientCallbackContext::SetContext*>(ctx);
402         OCRepresentation attrs;
403         HeaderOptions serverHeaderOptions;
404
405         OCStackResult result = clientResponse->result;
406         if (OC_STACK_OK               == result ||
407             OC_STACK_RESOURCE_CREATED == result ||
408             OC_STACK_RESOURCE_DELETED == result)
409         {
410             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
411             try
412             {
413                 attrs = parseGetSetCallback(clientResponse);
414             }
415             catch(OC::OCException& e)
416             {
417                 result = e.code();
418             }
419         }
420
421         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
422         exec.detach();
423         return OC_STACK_DELETE_TRANSACTION;
424     }
425
426     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
427         const QueryParamsMap& queryParams)
428     {
429         if(uri.back() == '/')
430         {
431             uri.resize(uri.size()-1);
432         }
433
434         ostringstream paramsList;
435         if(queryParams.size() > 0)
436         {
437             paramsList << '?';
438         }
439
440         for(auto& param : queryParams)
441         {
442             paramsList << param.first <<'='<<param.second<<';';
443         }
444
445         std::string queryString = paramsList.str();
446         if(queryString.back() == ';')
447         {
448             queryString.resize(queryString.size() - 1);
449         }
450
451         std::string ret = uri + queryString;
452         return ret;
453     }
454
455     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
456     {
457         MessageContainer ocInfo;
458         ocInfo.addRepresentation(rep);
459         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
460     }
461
462     OCStackResult InProcClientWrapper::PostResourceRepresentation(
463         const OCDevAddr& devAddr,
464         const std::string& uri,
465         const OCRepresentation& rep,
466         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
467         PostCallback& callback, QualityOfService QoS)
468     {
469         if(!callback)
470         {
471             return OC_STACK_INVALID_PARAM;
472         }
473         OCStackResult result;
474         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
475         OCCallbackData cbdata(
476                 static_cast<void*>(ctx),
477                 setResourceCallback,
478                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
479                 );
480
481         std::string url = assembleSetResourceUri(uri, queryParams);
482
483         auto cLock = m_csdkLock.lock();
484
485         if(cLock)
486         {
487             std::lock_guard<std::recursive_mutex> lock(*cLock);
488             OCHeaderOption options[MAX_HEADER_OPTIONS];
489
490             result = OCDoResource(nullptr, OC_REST_POST,
491                                   url.c_str(), &devAddr,
492                                   assembleSetResourcePayload(rep).c_str(),
493                                   CT_DEFAULT,
494                                   static_cast<OCQualityOfService>(QoS),
495                                   &cbdata,
496                                   assembleHeaderOptions(options, headerOptions),
497                                   headerOptions.size());
498         }
499         else
500         {
501             delete ctx;
502             result = OC_STACK_ERROR;
503         }
504
505         return result;
506     }
507
508     OCStackResult InProcClientWrapper::PutResourceRepresentation(
509         const OCDevAddr& devAddr,
510         const std::string& uri,
511         const OCRepresentation& rep,
512         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
513         PutCallback& callback, QualityOfService QoS)
514     {
515         if(!callback)
516         {
517             return OC_STACK_INVALID_PARAM;
518         }
519         OCStackResult result;
520         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
521         OCCallbackData cbdata(
522                 static_cast<void*>(ctx),
523                 setResourceCallback,
524                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
525                 );
526
527         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
528
529         auto cLock = m_csdkLock.lock();
530
531         if(cLock)
532         {
533             std::lock_guard<std::recursive_mutex> lock(*cLock);
534             OCDoHandle handle;
535             OCHeaderOption options[MAX_HEADER_OPTIONS];
536
537             result = OCDoResource(&handle, OC_REST_PUT,
538                                   url.c_str(), &devAddr,
539                                   assembleSetResourcePayload(rep).c_str(),
540                                   CT_DEFAULT,
541                                   static_cast<OCQualityOfService>(QoS),
542                                   &cbdata,
543                                   assembleHeaderOptions(options, headerOptions),
544                                   headerOptions.size());
545         }
546         else
547         {
548             delete ctx;
549             result = OC_STACK_ERROR;
550         }
551
552         return result;
553     }
554
555     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
556         OCClientResponse* clientResponse)
557     {
558         ClientCallbackContext::DeleteContext* context =
559             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
560         HeaderOptions serverHeaderOptions;
561
562         if(clientResponse->result == OC_STACK_OK)
563         {
564             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
565         }
566         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
567         exec.detach();
568         return OC_STACK_DELETE_TRANSACTION;
569     }
570
571     OCStackResult InProcClientWrapper::DeleteResource(
572         const OCDevAddr& devAddr,
573         const std::string& uri,
574         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
575     {
576         if(!callback)
577         {
578             return OC_STACK_INVALID_PARAM;
579         }
580         OCStackResult result;
581         ClientCallbackContext::DeleteContext* ctx =
582             new ClientCallbackContext::DeleteContext(callback);
583         OCCallbackData cbdata(
584                 static_cast<void*>(ctx),
585                 deleteResourceCallback,
586                 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
587                 );
588
589         auto cLock = m_csdkLock.lock();
590
591         if(cLock)
592         {
593             OCHeaderOption options[MAX_HEADER_OPTIONS];
594
595             std::lock_guard<std::recursive_mutex> lock(*cLock);
596
597             result = OCDoResource(nullptr, OC_REST_DELETE,
598                                   uri.c_str(), &devAddr,
599                                   nullptr,
600                                   CT_DEFAULT,
601                                   static_cast<OCQualityOfService>(m_cfg.QoS),
602                                   &cbdata,
603                                   assembleHeaderOptions(options, headerOptions),
604                                   headerOptions.size());
605         }
606         else
607         {
608             delete ctx;
609             result = OC_STACK_ERROR;
610         }
611
612         return result;
613     }
614
615     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
616         OCClientResponse* clientResponse)
617     {
618         ClientCallbackContext::ObserveContext* context =
619             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
620         OCRepresentation attrs;
621         HeaderOptions serverHeaderOptions;
622         uint32_t sequenceNumber = clientResponse->sequenceNumber;
623         OCStackResult result = clientResponse->result;
624         if(clientResponse->result == OC_STACK_OK)
625         {
626             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
627             try
628             {
629                 attrs = parseGetSetCallback(clientResponse);
630             }
631             catch(OC::OCException& e)
632             {
633                 result = e.code();
634             }
635         }
636         std::thread exec(context->callback, serverHeaderOptions, attrs,
637                     result, sequenceNumber);
638         exec.detach();
639         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
640         {
641             return OC_STACK_DELETE_TRANSACTION;
642         }
643         return OC_STACK_KEEP_TRANSACTION;
644     }
645
646     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
647         const OCDevAddr& devAddr,
648         const std::string& uri,
649         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
650         ObserveCallback& callback, QualityOfService QoS)
651     {
652         if(!callback)
653         {
654             return OC_STACK_INVALID_PARAM;
655         }
656         OCStackResult result;
657
658         ClientCallbackContext::ObserveContext* ctx =
659             new ClientCallbackContext::ObserveContext(callback);
660         OCCallbackData cbdata(
661                 static_cast<void*>(ctx),
662                 observeResourceCallback,
663                 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
664                 );
665
666         OCMethod method;
667         if (observeType == ObserveType::Observe)
668         {
669             method = OC_REST_OBSERVE;
670         }
671         else if (observeType == ObserveType::ObserveAll)
672         {
673             method = OC_REST_OBSERVE_ALL;
674         }
675         else
676         {
677             method = OC_REST_OBSERVE_ALL;
678         }
679
680         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
681
682         auto cLock = m_csdkLock.lock();
683
684         if(cLock)
685         {
686             std::lock_guard<std::recursive_mutex> lock(*cLock);
687             OCHeaderOption options[MAX_HEADER_OPTIONS];
688
689             result = OCDoResource(handle, method,
690                                   url.c_str(), &devAddr,
691                                   nullptr,
692                                   CT_DEFAULT,
693                                   static_cast<OCQualityOfService>(QoS),
694                                   &cbdata,
695                                   assembleHeaderOptions(options, headerOptions),
696                                   headerOptions.size());
697         }
698         else
699         {
700             delete ctx;
701             return OC_STACK_ERROR;
702         }
703
704         return result;
705     }
706
707     OCStackResult InProcClientWrapper::CancelObserveResource(
708             OCDoHandle handle,
709             const std::string& host, // unused
710             const std::string& uri,  // unused
711             const HeaderOptions& headerOptions,
712             QualityOfService QoS)
713     {
714         OCStackResult result;
715         auto cLock = m_csdkLock.lock();
716
717         if(cLock)
718         {
719             std::lock_guard<std::recursive_mutex> lock(*cLock);
720             OCHeaderOption options[MAX_HEADER_OPTIONS];
721
722             result = OCCancel(handle,
723                     static_cast<OCQualityOfService>(QoS),
724                     assembleHeaderOptions(options, headerOptions),
725                     headerOptions.size());
726         }
727         else
728         {
729             result = OC_STACK_ERROR;
730         }
731
732         return result;
733     }
734
735     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
736             OCClientResponse* clientResponse)
737     {
738         ClientCallbackContext::SubscribePresenceContext* context =
739         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
740
741         /*
742          * This a hack while we rethink presence subscription.
743          */
744         std::string url = clientResponse->devAddr.addr;
745
746         std::thread exec(context->callback, clientResponse->result,
747                     clientResponse->sequenceNumber, url);
748
749         exec.detach();
750
751         return OC_STACK_KEEP_TRANSACTION;
752     }
753
754     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
755         const std::string& host, const std::string& resourceType,
756         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
757     {
758         if(!presenceHandler)
759         {
760             return OC_STACK_INVALID_PARAM;
761         }
762
763         ClientCallbackContext::SubscribePresenceContext* ctx =
764             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
765         OCCallbackData cbdata(
766                 static_cast<void*>(ctx),
767                 subscribePresenceCallback,
768                 [](void* c)
769                 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
770                 );
771
772         auto cLock = m_csdkLock.lock();
773
774         std::ostringstream os;
775         os << host << OC_PRESENCE_URI;;
776
777         if(!resourceType.empty())
778         {
779             os << "?rt=" << resourceType;
780         }
781
782         if(!cLock)
783         {
784             delete ctx;
785             return OC_STACK_ERROR;
786         }
787
788         return OCDoResource(handle, OC_REST_PRESENCE,
789                             os.str().c_str(), nullptr,
790                             nullptr, connectivityType,
791                             OC_LOW_QOS, &cbdata, NULL, 0);
792     }
793
794     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
795     {
796         OCStackResult result;
797         auto cLock = m_csdkLock.lock();
798
799         if(cLock)
800         {
801             std::lock_guard<std::recursive_mutex> lock(*cLock);
802             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
803         }
804         else
805         {
806             result = OC_STACK_ERROR;
807         }
808
809         return result;
810     }
811
812     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
813     {
814         qos = m_cfg.QoS;
815         return OC_STACK_OK;
816     }
817
818     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
819            const HeaderOptions& headerOptions)
820     {
821         int i = 0;
822
823         if( headerOptions.size() == 0)
824         {
825             return nullptr;
826         }
827
828         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
829         {
830             options[i] = OCHeaderOption(OC_COAP_ID,
831                     it->getOptionID(),
832                     it->getOptionData().length() + 1,
833                     reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
834             i++;
835         }
836
837         return options;
838     }
839 }