Merge branch 'master' into resource-manipulation
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCTransportFlags serverFlags =
42                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43             OCTransportFlags clientFlags =
44                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
46
47             if(OC_STACK_OK != result)
48             {
49                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
50             }
51
52             m_threadRun = true;
53             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
54         }
55     }
56
57     InProcClientWrapper::~InProcClientWrapper()
58     {
59         if(m_threadRun && m_listeningThread.joinable())
60         {
61             m_threadRun = false;
62             m_listeningThread.join();
63         }
64
65         // only stop if we are the ones who actually called 'init'.  We are counting
66         // on the server to do the stop.
67         if(m_cfg.mode == ModeType::Client)
68         {
69             OCStop();
70         }
71     }
72
73     void InProcClientWrapper::listeningFunc()
74     {
75         while(m_threadRun)
76         {
77             OCStackResult result;
78             auto cLock = m_csdkLock.lock();
79             if(cLock)
80             {
81                 std::lock_guard<std::recursive_mutex> lock(*cLock);
82                 result = OCProcess();
83             }
84             else
85             {
86                 result = OC_STACK_ERROR;
87             }
88
89             if(result != OC_STACK_OK)
90             {
91                 // TODO: do something with result if failed?
92             }
93
94             // To minimize CPU utilization we may wish to do this with sleep
95             std::this_thread::sleep_for(std::chrono::milliseconds(10));
96         }
97     }
98
99     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
100     {
101         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
102         {
103             return OCRepresentation();
104         }
105
106         MessageContainer oc;
107         try
108         {
109             oc.setJSONRepresentation(clientResponse->resJSONPayload);
110         }
111         catch (cereal::RapidJSONException& ex)
112         {
113             oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
116         }
117         catch (cereal::Exception& ex)
118         {
119             oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120                 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121             throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
122         }
123
124         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125         if(it == oc.representations().end())
126         {
127             return OCRepresentation();
128         }
129
130         // first one is considered the root, everything else is considered a child of this one.
131         OCRepresentation root = *it;
132         ++it;
133
134         std::for_each(it, oc.representations().end(),
135                 [&root](const OCRepresentation& repItr)
136                 {root.addChild(repItr);});
137         return root;
138
139     }
140
141     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142         OCClientResponse* clientResponse)
143     {
144         ClientCallbackContext::ListenContext* context =
145             static_cast<ClientCallbackContext::ListenContext*>(ctx);
146
147         if(clientResponse->result != OC_STACK_OK)
148         {
149             oclog() << "listenCallback(): failed to create resource. clientResponse: "
150                     << clientResponse->result
151                     << std::flush;
152
153             return OC_STACK_KEEP_TRANSACTION;
154         }
155
156         auto clientWrapper = context->clientWrapper.lock();
157
158         if(!clientWrapper)
159         {
160             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
161                     << std::flush;
162             return OC_STACK_KEEP_TRANSACTION;
163         }
164
165         std::stringstream requestStream;
166         requestStream << clientResponse->resJSONPayload;
167
168         try
169         {
170
171             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
172                                     requestStream);
173             // loop to ensure valid construction of all resources
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179
180         }
181         catch(const std::exception& e)
182         {
183             oclog() << "listenCallback failed to parse a malformed message: "
184                     << e.what()
185                     << std::endl
186                     << clientResponse->resJSONPayload
187                     << std::endl
188                     << clientResponse->result
189                     << std::flush;
190             return OC_STACK_KEEP_TRANSACTION;
191         }
192
193         return OC_STACK_KEEP_TRANSACTION;
194     }
195
196     OCStackResult InProcClientWrapper::ListenForResource(
197             const std::string& serviceUrl,  // unused
198             const std::string& resourceType,
199             OCConnectivityType connectivityType,
200             FindCallback& callback, QualityOfService QoS)
201     {
202         if(!callback)
203         {
204             return OC_STACK_INVALID_PARAM;
205         }
206
207         OCStackResult result;
208
209         ClientCallbackContext::ListenContext* context =
210             new ClientCallbackContext::ListenContext(callback, shared_from_this());
211         OCCallbackData cbdata(
212                 static_cast<void*>(context),
213                 listenCallback,
214                 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
215             );
216
217         auto cLock = m_csdkLock.lock();
218         if(cLock)
219         {
220             std::lock_guard<std::recursive_mutex> lock(*cLock);
221             result = OCDoResource(nullptr, OC_REST_DISCOVER,
222                                   resourceType.c_str(),
223                                   nullptr, nullptr, connectivityType,
224                                   static_cast<OCQualityOfService>(QoS),
225                                   &cbdata,
226                                   nullptr, 0);
227         }
228         else
229         {
230             delete context;
231             result = OC_STACK_ERROR;
232         }
233         return result;
234     }
235
236     OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237             OCClientResponse* clientResponse)
238     {
239         ClientCallbackContext::DeviceListenContext* context =
240             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
241
242         try
243         {
244             OCRepresentation rep = parseGetSetCallback(clientResponse);
245             std::thread exec(context->callback, rep);
246             exec.detach();
247         }
248         catch(OC::OCException& e)
249         {
250             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251                 <<e.what() <<std::flush;
252         }
253
254         return OC_STACK_KEEP_TRANSACTION;
255     }
256
257     OCStackResult InProcClientWrapper::ListenForDevice(
258             const std::string& serviceUrl,  // unused
259             const std::string& deviceURI,
260             OCConnectivityType connectivityType,
261             FindDeviceCallback& callback,
262             QualityOfService QoS)
263     {
264         if(!callback)
265         {
266             return OC_STACK_INVALID_PARAM;
267         }
268         OCStackResult result;
269
270         ClientCallbackContext::DeviceListenContext* context =
271             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272         OCCallbackData cbdata(
273                 static_cast<void*>(context),
274                 listenDeviceCallback,
275                 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
276                 );
277
278         auto cLock = m_csdkLock.lock();
279         if(cLock)
280         {
281             std::lock_guard<std::recursive_mutex> lock(*cLock);
282             result = OCDoResource(nullptr, OC_REST_DISCOVER,
283                                   deviceURI.c_str(),
284                                   nullptr, nullptr, connectivityType,
285                                   static_cast<OCQualityOfService>(QoS),
286                                   &cbdata,
287                                   nullptr, 0);
288         }
289         else
290         {
291             delete context;
292             result = OC_STACK_ERROR;
293         }
294         return result;
295     }
296
297     void parseServerHeaderOptions(OCClientResponse* clientResponse,
298                     HeaderOptions& serverHeaderOptions)
299     {
300         if(clientResponse)
301         {
302             // Parse header options from server
303             uint16_t optionID;
304             std::string optionData;
305
306             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
307             {
308                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309                 optionData = reinterpret_cast<const char*>
310                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312                 serverHeaderOptions.push_back(headerOption);
313             }
314         }
315         else
316         {
317             // clientResponse is invalid
318             // TODO check proper logging
319             std::cout << " Invalid response " << std::endl;
320         }
321     }
322
323     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324         OCClientResponse* clientResponse)
325     {
326         ClientCallbackContext::GetContext* context =
327             static_cast<ClientCallbackContext::GetContext*>(ctx);
328
329         OCRepresentation rep;
330         HeaderOptions serverHeaderOptions;
331         OCStackResult result = clientResponse->result;
332         if(result == OC_STACK_OK)
333         {
334             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
335             try
336             {
337                 rep = parseGetSetCallback(clientResponse);
338             }
339             catch(OC::OCException& e)
340             {
341                 result = e.code();
342             }
343         }
344
345         std::thread exec(context->callback, serverHeaderOptions, rep, result);
346         exec.detach();
347         return OC_STACK_DELETE_TRANSACTION;
348     }
349
350     OCStackResult InvokeDoResource(OCDoHandle *handle,
351                                     OCMethod method,
352                                     const char *requestUri,
353                                     const OCDevAddr *destination,
354                                     const char *request,
355                                     OCConnectivityType connectivityType,
356                                     OCQualityOfService qos,
357                                     OCCallbackData *cbData,
358                                     OCHeaderOption *options,
359                                     uint8_t numOptions,
360                                     bool useHostString)
361     {
362         if (useHostString)
363         {
364             ostringstream host;   
365             host << destination << requestUri;
366             connectivityType = (OCConnectivityType)
367                                 ((destination->adapter << CT_ADAPTER_SHIFT)
368                                 | (destination->flags & CT_MASK_FLAGS));
369             return OCDoResource(handle,
370                                 method,
371                                 host.str().c_str(),
372                                 nullptr,
373                                 request,
374                                 connectivityType,
375                                 qos,
376                                 cbData,
377                                 options,
378                                 numOptions);
379         }
380         else
381         {
382             return OCDoResource(handle,
383                                 method,
384                                 requestUri,
385                                 destination,
386                                 request,
387                                 connectivityType,
388                                 qos,
389                                 cbData,
390                                 options,
391                                 numOptions);
392         }
393     }
394
395     OCStackResult InProcClientWrapper::GetResourceRepresentation(
396         const OCDevAddr& devAddr, bool useHostString,
397         const std::string& resourceUri,
398         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
399         GetCallback& callback, QualityOfService QoS)
400     {
401         if(!callback)
402         {
403             return OC_STACK_INVALID_PARAM;
404         }
405         OCStackResult result;
406         ClientCallbackContext::GetContext* ctx =
407             new ClientCallbackContext::GetContext(callback);
408         OCCallbackData cbdata(
409                 static_cast<void*>(ctx),
410                 getResourceCallback,
411                 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
412                 );
413
414         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
415
416         auto cLock = m_csdkLock.lock();
417
418         if(cLock)
419         {
420             std::lock_guard<std::recursive_mutex> lock(*cLock);
421             OCHeaderOption options[MAX_HEADER_OPTIONS];
422
423             result = OCDoResource(nullptr, OC_REST_GET,
424                                   uri.c_str(),
425                                   &devAddr, nullptr,
426                                   CT_DEFAULT,
427                                   static_cast<OCQualityOfService>(QoS),
428                                   &cbdata,
429                                   assembleHeaderOptions(options, headerOptions),
430                                   headerOptions.size());
431         }
432         else
433         {
434             delete ctx;
435             result = OC_STACK_ERROR;
436         }
437         return result;
438     }
439
440
441     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
442         OCClientResponse* clientResponse)
443     {
444         ClientCallbackContext::SetContext* context =
445             static_cast<ClientCallbackContext::SetContext*>(ctx);
446         OCRepresentation attrs;
447         HeaderOptions serverHeaderOptions;
448
449         OCStackResult result = clientResponse->result;
450         if (OC_STACK_OK               == result ||
451             OC_STACK_RESOURCE_CREATED == result ||
452             OC_STACK_RESOURCE_DELETED == result)
453         {
454             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
455             try
456             {
457                 attrs = parseGetSetCallback(clientResponse);
458             }
459             catch(OC::OCException& e)
460             {
461                 result = e.code();
462             }
463         }
464
465         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
466         exec.detach();
467         return OC_STACK_DELETE_TRANSACTION;
468     }
469
470     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
471         const QueryParamsMap& queryParams)
472     {
473         if(uri.back() == '/')
474         {
475             uri.resize(uri.size()-1);
476         }
477
478         ostringstream paramsList;
479         if(queryParams.size() > 0)
480         {
481             paramsList << '?';
482         }
483
484         for(auto& param : queryParams)
485         {
486             paramsList << param.first <<'='<<param.second<<';';
487         }
488
489         std::string queryString = paramsList.str();
490         if(queryString.back() == ';')
491         {
492             queryString.resize(queryString.size() - 1);
493         }
494
495         std::string ret = uri + queryString;
496         return ret;
497     }
498
499     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
500     {
501         MessageContainer ocInfo;
502         ocInfo.addRepresentation(rep);
503         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
504     }
505
506     OCStackResult InProcClientWrapper::PostResourceRepresentation(
507         const OCDevAddr& devAddr, bool useHostString,
508         const std::string& uri,
509         const OCRepresentation& rep,
510         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
511         PostCallback& callback, QualityOfService QoS)
512     {
513         if(!callback)
514         {
515             return OC_STACK_INVALID_PARAM;
516         }
517         OCStackResult result;
518         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
519         OCCallbackData cbdata(
520                 static_cast<void*>(ctx),
521                 setResourceCallback,
522                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
523                 );
524
525         std::string url = assembleSetResourceUri(uri, queryParams);
526
527         auto cLock = m_csdkLock.lock();
528
529         if(cLock)
530         {
531             std::lock_guard<std::recursive_mutex> lock(*cLock);
532             OCHeaderOption options[MAX_HEADER_OPTIONS];
533
534             result = OCDoResource(nullptr, OC_REST_POST,
535                                   url.c_str(), &devAddr,
536                                   assembleSetResourcePayload(rep).c_str(),
537                                   CT_DEFAULT,
538                                   static_cast<OCQualityOfService>(QoS),
539                                   &cbdata,
540                                   assembleHeaderOptions(options, headerOptions),
541                                   headerOptions.size());
542         }
543         else
544         {
545             delete ctx;
546             result = OC_STACK_ERROR;
547         }
548
549         return result;
550     }
551
552     OCStackResult InProcClientWrapper::PutResourceRepresentation(
553         const OCDevAddr& devAddr, bool useHostString,
554         const std::string& uri,
555         const OCRepresentation& rep,
556         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
557         PutCallback& callback, QualityOfService QoS)
558     {
559         if(!callback)
560         {
561             return OC_STACK_INVALID_PARAM;
562         }
563         OCStackResult result;
564         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
565         OCCallbackData cbdata(
566                 static_cast<void*>(ctx),
567                 setResourceCallback,
568                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
569                 );
570
571         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
572
573         auto cLock = m_csdkLock.lock();
574
575         if(cLock)
576         {
577             std::lock_guard<std::recursive_mutex> lock(*cLock);
578             OCDoHandle handle;
579             OCHeaderOption options[MAX_HEADER_OPTIONS];
580
581             result = OCDoResource(&handle, OC_REST_PUT,
582                                   url.c_str(), &devAddr,
583                                   assembleSetResourcePayload(rep).c_str(),
584                                   CT_DEFAULT,
585                                   static_cast<OCQualityOfService>(QoS),
586                                   &cbdata,
587                                   assembleHeaderOptions(options, headerOptions),
588                                   headerOptions.size());
589         }
590         else
591         {
592             delete ctx;
593             result = OC_STACK_ERROR;
594         }
595
596         return result;
597     }
598
599     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
600         OCClientResponse* clientResponse)
601     {
602         ClientCallbackContext::DeleteContext* context =
603             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
604         HeaderOptions serverHeaderOptions;
605
606         if(clientResponse->result == OC_STACK_OK)
607         {
608             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
609         }
610         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
611         exec.detach();
612         return OC_STACK_DELETE_TRANSACTION;
613     }
614
615     OCStackResult InProcClientWrapper::DeleteResource(
616         const OCDevAddr& devAddr, bool useHostString,
617         const std::string& uri,
618         const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
619     {
620         if(!callback)
621         {
622             return OC_STACK_INVALID_PARAM;
623         }
624         OCStackResult result;
625         ClientCallbackContext::DeleteContext* ctx =
626             new ClientCallbackContext::DeleteContext(callback);
627         OCCallbackData cbdata(
628                 static_cast<void*>(ctx),
629                 deleteResourceCallback,
630                 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
631                 );
632
633         auto cLock = m_csdkLock.lock();
634
635         if(cLock)
636         {
637             OCHeaderOption options[MAX_HEADER_OPTIONS];
638
639             std::lock_guard<std::recursive_mutex> lock(*cLock);
640
641             result = OCDoResource(nullptr, OC_REST_DELETE,
642                                   uri.c_str(), &devAddr,
643                                   nullptr,
644                                   CT_DEFAULT,
645                                   static_cast<OCQualityOfService>(m_cfg.QoS),
646                                   &cbdata,
647                                   assembleHeaderOptions(options, headerOptions),
648                                   headerOptions.size());
649         }
650         else
651         {
652             delete ctx;
653             result = OC_STACK_ERROR;
654         }
655
656         return result;
657     }
658
659     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
660         OCClientResponse* clientResponse)
661     {
662         ClientCallbackContext::ObserveContext* context =
663             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
664         OCRepresentation attrs;
665         HeaderOptions serverHeaderOptions;
666         uint32_t sequenceNumber = clientResponse->sequenceNumber;
667         OCStackResult result = clientResponse->result;
668         if(clientResponse->result == OC_STACK_OK)
669         {
670             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
671             try
672             {
673                 attrs = parseGetSetCallback(clientResponse);
674             }
675             catch(OC::OCException& e)
676             {
677                 result = e.code();
678             }
679         }
680         std::thread exec(context->callback, serverHeaderOptions, attrs,
681                     result, sequenceNumber);
682         exec.detach();
683         if(sequenceNumber == OC_OBSERVE_DEREGISTER)
684         {
685             return OC_STACK_DELETE_TRANSACTION;
686         }
687         return OC_STACK_KEEP_TRANSACTION;
688     }
689
690     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
691         const OCDevAddr& devAddr, bool useHostString,
692         const std::string& uri,
693         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
694         ObserveCallback& callback, QualityOfService QoS)
695     {
696         if(!callback)
697         {
698             return OC_STACK_INVALID_PARAM;
699         }
700         OCStackResult result;
701
702         ClientCallbackContext::ObserveContext* ctx =
703             new ClientCallbackContext::ObserveContext(callback);
704         OCCallbackData cbdata(
705                 static_cast<void*>(ctx),
706                 observeResourceCallback,
707                 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
708                 );
709
710         OCMethod method;
711         if (observeType == ObserveType::Observe)
712         {
713             method = OC_REST_OBSERVE;
714         }
715         else if (observeType == ObserveType::ObserveAll)
716         {
717             method = OC_REST_OBSERVE_ALL;
718         }
719         else
720         {
721             method = OC_REST_OBSERVE_ALL;
722         }
723
724         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
725
726         auto cLock = m_csdkLock.lock();
727
728         if(cLock)
729         {
730             std::lock_guard<std::recursive_mutex> lock(*cLock);
731             OCHeaderOption options[MAX_HEADER_OPTIONS];
732
733             result = OCDoResource(handle, method,
734                                   url.c_str(), &devAddr,
735                                   nullptr,
736                                   CT_DEFAULT,
737                                   static_cast<OCQualityOfService>(QoS),
738                                   &cbdata,
739                                   assembleHeaderOptions(options, headerOptions),
740                                   headerOptions.size());
741         }
742         else
743         {
744             delete ctx;
745             return OC_STACK_ERROR;
746         }
747
748         return result;
749     }
750
751     OCStackResult InProcClientWrapper::CancelObserveResource(
752             OCDoHandle handle,
753             const std::string& host, // unused
754             const std::string& uri,  // unused
755             const HeaderOptions& headerOptions,
756             QualityOfService QoS)
757     {
758         OCStackResult result;
759         auto cLock = m_csdkLock.lock();
760
761         if(cLock)
762         {
763             std::lock_guard<std::recursive_mutex> lock(*cLock);
764             OCHeaderOption options[MAX_HEADER_OPTIONS];
765
766             result = OCCancel(handle,
767                     static_cast<OCQualityOfService>(QoS),
768                     assembleHeaderOptions(options, headerOptions),
769                     headerOptions.size());
770         }
771         else
772         {
773             result = OC_STACK_ERROR;
774         }
775
776         return result;
777     }
778
779     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
780             OCClientResponse* clientResponse)
781     {
782         ClientCallbackContext::SubscribePresenceContext* context =
783         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
784
785         /*
786          * This a hack while we rethink presence subscription.
787          */
788         std::string url = clientResponse->devAddr.addr;
789
790         std::thread exec(context->callback, clientResponse->result,
791                     clientResponse->sequenceNumber, url);
792
793         exec.detach();
794
795         return OC_STACK_KEEP_TRANSACTION;
796     }
797
798     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
799         const std::string& host, const std::string& resourceType,
800         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
801     {
802         if(!presenceHandler)
803         {
804             return OC_STACK_INVALID_PARAM;
805         }
806
807         ClientCallbackContext::SubscribePresenceContext* ctx =
808             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
809         OCCallbackData cbdata(
810                 static_cast<void*>(ctx),
811                 subscribePresenceCallback,
812                 [](void* c)
813                 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
814                 );
815
816         auto cLock = m_csdkLock.lock();
817
818         std::ostringstream os;
819         os << host << "/oc/presence";
820
821         if(!resourceType.empty())
822         {
823             os << "?rt=" << resourceType;
824         }
825
826         if(!cLock)
827         {
828             delete ctx;
829             return OC_STACK_ERROR;
830         }
831
832         return OCDoResource(handle, OC_REST_PRESENCE,
833                             os.str().c_str(), nullptr,
834                             nullptr, connectivityType,
835                             OC_LOW_QOS, &cbdata, NULL, 0);
836     }
837
838     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
839     {
840         OCStackResult result;
841         auto cLock = m_csdkLock.lock();
842
843         if(cLock)
844         {
845             std::lock_guard<std::recursive_mutex> lock(*cLock);
846             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
847         }
848         else
849         {
850             result = OC_STACK_ERROR;
851         }
852
853         return result;
854     }
855
856     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
857     {
858         qos = m_cfg.QoS;
859         return OC_STACK_OK;
860     }
861
862     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
863            const HeaderOptions& headerOptions)
864     {
865         int i = 0;
866
867         if( headerOptions.size() == 0)
868         {
869             return nullptr;
870         }
871
872         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
873         {
874             options[i] = OCHeaderOption(OC_COAP_ID,
875                     it->getOptionID(),
876                     it->getOptionData().length() + 1,
877                     reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
878             i++;
879         }
880
881         return options;
882     }
883 }