Merge branch 'master' into notification-service
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try{
169             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171             // loop to ensure valid construction of all resources
172             for(auto resource : container.Resources())
173             {
174                 std::thread exec(context->callback, resource);
175                 exec.detach();
176             }
177         }
178         catch (std::exception &e){
179             oclog() << "Exception in listCallback, ignoring response: "
180                     << e.what() << std::flush;
181         }
182
183
184         return OC_STACK_KEEP_TRANSACTION;
185     }
186
187     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
188         OCClientResponse* clientResponse)
189     {
190         if (!ctx || !clientResponse)
191         {
192             return OC_STACK_KEEP_TRANSACTION;
193         }
194
195         ClientCallbackContext::ListenErrorContext* context =
196             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
197         if (!context)
198         {
199             return OC_STACK_KEEP_TRANSACTION;
200         }
201
202         OCStackResult result = clientResponse->result;
203         if (result == OC_STACK_OK)
204         {
205             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
206             {
207                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
208                     << std::flush;
209                 return OC_STACK_KEEP_TRANSACTION;
210             }
211
212             auto clientWrapper = context->clientWrapper.lock();
213
214             if (!clientWrapper)
215             {
216                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
217                         << std::flush;
218                 return OC_STACK_KEEP_TRANSACTION;
219             }
220
221             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
222                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
223             // loop to ensure valid construction of all resources
224             for (auto resource : container.Resources())
225             {
226                 std::thread exec(context->callback, resource);
227                 exec.detach();
228             }
229             return OC_STACK_KEEP_TRANSACTION;
230         }
231
232         std::string resourceURI = clientResponse->resourceUri;
233         std::thread exec(context->errorCallback, resourceURI, result);
234         exec.detach();
235         return OC_STACK_DELETE_TRANSACTION;
236     }
237
238     OCStackResult InProcClientWrapper::ListenForResource(
239             const std::string& serviceUrl,
240             const std::string& resourceType,
241             OCConnectivityType connectivityType,
242             FindCallback& callback, QualityOfService QoS)
243     {
244         if (!callback)
245         {
246             return OC_STACK_INVALID_PARAM;
247         }
248
249         OCStackResult result;
250         ostringstream resourceUri;
251         resourceUri << serviceUrl << resourceType;
252
253         ClientCallbackContext::ListenContext* context =
254             new ClientCallbackContext::ListenContext(callback, shared_from_this());
255         OCCallbackData cbdata(
256                 static_cast<void*>(context),
257                 listenCallback,
258                 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
259             );
260
261         auto cLock = m_csdkLock.lock();
262         if (cLock)
263         {
264             std::lock_guard<std::recursive_mutex> lock(*cLock);
265             result = OCDoResource(nullptr, OC_REST_DISCOVER,
266                                   resourceUri.str().c_str(),
267                                   nullptr, nullptr, connectivityType,
268                                   static_cast<OCQualityOfService>(QoS),
269                                   &cbdata,
270                                   nullptr, 0);
271         }
272         else
273         {
274             delete context;
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280     OCStackResult InProcClientWrapper::ListenErrorForResource(
281             const std::string& serviceUrl,
282             const std::string& resourceType,
283             OCConnectivityType connectivityType,
284             FindCallback& callback, FindErrorCallback& errorCallback,
285             QualityOfService QoS)
286     {
287         if (!callback)
288         {
289             return OC_STACK_INVALID_PARAM;
290         }
291
292         ostringstream resourceUri;
293         resourceUri << serviceUrl << resourceType;
294
295         ClientCallbackContext::ListenErrorContext* context =
296             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
297                                                           shared_from_this());
298         if (!context)
299         {
300             return OC_STACK_ERROR;
301         }
302
303         OCCallbackData cbdata(
304                 static_cast<void*>(context),
305                 listenErrorCallback,
306                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
307             );
308
309         OCStackResult result;
310         auto cLock = m_csdkLock.lock();
311         if (cLock)
312         {
313             std::lock_guard<std::recursive_mutex> lock(*cLock);
314             result = OCDoResource(nullptr, OC_REST_DISCOVER,
315                                   resourceUri.str().c_str(),
316                                   nullptr, nullptr, connectivityType,
317                                   static_cast<OCQualityOfService>(QoS),
318                                   &cbdata,
319                                   nullptr, 0);
320         }
321         else
322         {
323             delete context;
324             result = OC_STACK_ERROR;
325         }
326         return result;
327     }
328
329     OCStackApplicationResult listenDeviceCallback(void* ctx,
330                                                   OCDoHandle /*handle*/,
331             OCClientResponse* clientResponse)
332     {
333         ClientCallbackContext::DeviceListenContext* context =
334             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
335
336         try
337         {
338             OCRepresentation rep = parseGetSetCallback(clientResponse);
339             std::thread exec(context->callback, rep);
340             exec.detach();
341         }
342         catch(OC::OCException& e)
343         {
344             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
345                 <<e.what() <<std::flush;
346         }
347
348         return OC_STACK_KEEP_TRANSACTION;
349     }
350
351     OCStackResult InProcClientWrapper::ListenForDevice(
352             const std::string& serviceUrl,
353             const std::string& deviceURI,
354             OCConnectivityType connectivityType,
355             FindDeviceCallback& callback,
356             QualityOfService QoS)
357     {
358         if (!callback)
359         {
360             return OC_STACK_INVALID_PARAM;
361         }
362         OCStackResult result;
363         ostringstream deviceUri;
364         deviceUri << serviceUrl << deviceURI;
365
366         ClientCallbackContext::DeviceListenContext* context =
367             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
368         OCCallbackData cbdata(
369                 static_cast<void*>(context),
370                 listenDeviceCallback,
371                 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
372                 );
373
374         auto cLock = m_csdkLock.lock();
375         if (cLock)
376         {
377             std::lock_guard<std::recursive_mutex> lock(*cLock);
378             result = OCDoResource(nullptr, OC_REST_DISCOVER,
379                                   deviceUri.str().c_str(),
380                                   nullptr, nullptr, connectivityType,
381                                   static_cast<OCQualityOfService>(QoS),
382                                   &cbdata,
383                                   nullptr, 0);
384         }
385         else
386         {
387             delete context;
388             result = OC_STACK_ERROR;
389         }
390         return result;
391     }
392
393     void parseServerHeaderOptions(OCClientResponse* clientResponse,
394                     HeaderOptions& serverHeaderOptions)
395     {
396         if (clientResponse)
397         {
398             // Parse header options from server
399             uint16_t optionID;
400             std::string optionData;
401
402             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
403             {
404                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
405                 optionData = reinterpret_cast<const char*>
406                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
407                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
408                 serverHeaderOptions.push_back(headerOption);
409             }
410         }
411         else
412         {
413             // clientResponse is invalid
414             // TODO check proper logging
415             std::cout << " Invalid response " << std::endl;
416         }
417     }
418
419     OCStackApplicationResult getResourceCallback(void* ctx,
420                                                  OCDoHandle /*handle*/,
421         OCClientResponse* clientResponse)
422     {
423         ClientCallbackContext::GetContext* context =
424             static_cast<ClientCallbackContext::GetContext*>(ctx);
425
426         OCRepresentation rep;
427         HeaderOptions serverHeaderOptions;
428         OCStackResult result = clientResponse->result;
429         if (result == OC_STACK_OK)
430         {
431             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
432             try
433             {
434                 rep = parseGetSetCallback(clientResponse);
435             }
436             catch(OC::OCException& e)
437             {
438                 result = e.code();
439             }
440         }
441
442         std::thread exec(context->callback, serverHeaderOptions, rep, result);
443         exec.detach();
444         return OC_STACK_DELETE_TRANSACTION;
445     }
446
447     OCStackResult InProcClientWrapper::GetResourceRepresentation(
448         const OCDevAddr& devAddr,
449         const std::string& resourceUri,
450         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
451         GetCallback& callback, QualityOfService QoS)
452     {
453         if (!callback)
454         {
455             return OC_STACK_INVALID_PARAM;
456         }
457         OCStackResult result;
458         ClientCallbackContext::GetContext* ctx =
459             new ClientCallbackContext::GetContext(callback);
460         OCCallbackData cbdata(
461                 static_cast<void*>(ctx),
462                 getResourceCallback,
463                 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
464                 );
465
466         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
467
468         auto cLock = m_csdkLock.lock();
469
470         if (cLock)
471         {
472             std::lock_guard<std::recursive_mutex> lock(*cLock);
473             OCHeaderOption options[MAX_HEADER_OPTIONS];
474
475             result = OCDoResource(
476                                   nullptr, OC_REST_GET,
477                                   uri.c_str(),
478                                   &devAddr, nullptr,
479                                   CT_DEFAULT,
480                                   static_cast<OCQualityOfService>(QoS),
481                                   &cbdata,
482                                   assembleHeaderOptions(options, headerOptions),
483                                   headerOptions.size());
484         }
485         else
486         {
487             delete ctx;
488             result = OC_STACK_ERROR;
489         }
490         return result;
491     }
492
493
494     OCStackApplicationResult setResourceCallback(void* ctx,
495                                                  OCDoHandle /*handle*/,
496         OCClientResponse* clientResponse)
497     {
498         ClientCallbackContext::SetContext* context =
499             static_cast<ClientCallbackContext::SetContext*>(ctx);
500         OCRepresentation attrs;
501         HeaderOptions serverHeaderOptions;
502
503         OCStackResult result = clientResponse->result;
504         if (OC_STACK_OK               == result ||
505             OC_STACK_RESOURCE_CREATED == result ||
506             OC_STACK_RESOURCE_DELETED == result)
507         {
508             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
509             try
510             {
511                 attrs = parseGetSetCallback(clientResponse);
512             }
513             catch(OC::OCException& e)
514             {
515                 result = e.code();
516             }
517         }
518
519         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
520         exec.detach();
521         return OC_STACK_DELETE_TRANSACTION;
522     }
523
524     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
525         const QueryParamsMap& queryParams)
526     {
527         if (uri.back() == '/')
528         {
529             uri.resize(uri.size()-1);
530         }
531
532         ostringstream paramsList;
533         if (queryParams.size() > 0)
534         {
535             paramsList << '?';
536         }
537
538         for(auto& param : queryParams)
539         {
540             paramsList << param.first <<'='<<param.second<<';';
541         }
542
543         std::string queryString = paramsList.str();
544         if (queryString.back() == ';')
545         {
546             queryString.resize(queryString.size() - 1);
547         }
548
549         std::string ret = uri + queryString;
550         return ret;
551     }
552
553     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
554     {
555         MessageContainer ocInfo;
556         ocInfo.addRepresentation(rep);
557         for(const OCRepresentation& r : rep.getChildren())
558         {
559             ocInfo.addRepresentation(r);
560         }
561
562         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
563     }
564
565     OCStackResult InProcClientWrapper::PostResourceRepresentation(
566         const OCDevAddr& devAddr,
567         const std::string& uri,
568         const OCRepresentation& rep,
569         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
570         PostCallback& callback, QualityOfService QoS)
571     {
572         if (!callback)
573         {
574             return OC_STACK_INVALID_PARAM;
575         }
576         OCStackResult result;
577         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
578         OCCallbackData cbdata(
579                 static_cast<void*>(ctx),
580                 setResourceCallback,
581                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
582                 );
583
584         std::string url = assembleSetResourceUri(uri, queryParams);
585
586         auto cLock = m_csdkLock.lock();
587
588         if (cLock)
589         {
590             std::lock_guard<std::recursive_mutex> lock(*cLock);
591             OCHeaderOption options[MAX_HEADER_OPTIONS];
592
593             result = OCDoResource(nullptr, OC_REST_POST,
594                                   url.c_str(), &devAddr,
595                                   assembleSetResourcePayload(rep),
596                                   CT_DEFAULT,
597                                   static_cast<OCQualityOfService>(QoS),
598                                   &cbdata,
599                                   assembleHeaderOptions(options, headerOptions),
600                                   headerOptions.size());
601         }
602         else
603         {
604             delete ctx;
605             result = OC_STACK_ERROR;
606         }
607
608         return result;
609     }
610
611     OCStackResult InProcClientWrapper::PutResourceRepresentation(
612         const OCDevAddr& devAddr,
613         const std::string& uri,
614         const OCRepresentation& rep,
615         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
616         PutCallback& callback, QualityOfService QoS)
617     {
618         if (!callback)
619         {
620             return OC_STACK_INVALID_PARAM;
621         }
622         OCStackResult result;
623         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
624         OCCallbackData cbdata(
625                 static_cast<void*>(ctx),
626                 setResourceCallback,
627                 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
628                 );
629
630         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
631
632         auto cLock = m_csdkLock.lock();
633
634         if (cLock)
635         {
636             std::lock_guard<std::recursive_mutex> lock(*cLock);
637             OCDoHandle handle;
638             OCHeaderOption options[MAX_HEADER_OPTIONS];
639
640             result = OCDoResource(&handle, OC_REST_PUT,
641                                   url.c_str(), &devAddr,
642                                   assembleSetResourcePayload(rep),
643                                   CT_DEFAULT,
644                                   static_cast<OCQualityOfService>(QoS),
645                                   &cbdata,
646                                   assembleHeaderOptions(options, headerOptions),
647                                   headerOptions.size());
648         }
649         else
650         {
651             delete ctx;
652             result = OC_STACK_ERROR;
653         }
654
655         return result;
656     }
657
658     OCStackApplicationResult deleteResourceCallback(void* ctx,
659                                                     OCDoHandle /*handle*/,
660         OCClientResponse* clientResponse)
661     {
662         ClientCallbackContext::DeleteContext* context =
663             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
664         HeaderOptions serverHeaderOptions;
665
666         if (clientResponse->result == OC_STACK_OK)
667         {
668             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
669         }
670         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
671         exec.detach();
672         return OC_STACK_DELETE_TRANSACTION;
673     }
674
675     OCStackResult InProcClientWrapper::DeleteResource(
676         const OCDevAddr& devAddr,
677         const std::string& uri,
678         const HeaderOptions& headerOptions,
679         DeleteCallback& callback,
680         QualityOfService /*QoS*/)
681     {
682         if (!callback)
683         {
684             return OC_STACK_INVALID_PARAM;
685         }
686         OCStackResult result;
687         ClientCallbackContext::DeleteContext* ctx =
688             new ClientCallbackContext::DeleteContext(callback);
689         OCCallbackData cbdata(
690                 static_cast<void*>(ctx),
691                 deleteResourceCallback,
692                 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
693                 );
694
695         auto cLock = m_csdkLock.lock();
696
697         if (cLock)
698         {
699             OCHeaderOption options[MAX_HEADER_OPTIONS];
700
701             std::lock_guard<std::recursive_mutex> lock(*cLock);
702
703             result = OCDoResource(nullptr, OC_REST_DELETE,
704                                   uri.c_str(), &devAddr,
705                                   nullptr,
706                                   CT_DEFAULT,
707                                   static_cast<OCQualityOfService>(m_cfg.QoS),
708                                   &cbdata,
709                                   assembleHeaderOptions(options, headerOptions),
710                                   headerOptions.size());
711         }
712         else
713         {
714             delete ctx;
715             result = OC_STACK_ERROR;
716         }
717
718         return result;
719     }
720
721     OCStackApplicationResult observeResourceCallback(void* ctx,
722                                                      OCDoHandle /*handle*/,
723         OCClientResponse* clientResponse)
724     {
725         ClientCallbackContext::ObserveContext* context =
726             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
727         OCRepresentation attrs;
728         HeaderOptions serverHeaderOptions;
729         uint32_t sequenceNumber = clientResponse->sequenceNumber;
730         OCStackResult result = clientResponse->result;
731         if (clientResponse->result == OC_STACK_OK)
732         {
733             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
734             try
735             {
736                 attrs = parseGetSetCallback(clientResponse);
737             }
738             catch(OC::OCException& e)
739             {
740                 result = e.code();
741             }
742         }
743         std::thread exec(context->callback, serverHeaderOptions, attrs,
744                     result, sequenceNumber);
745         exec.detach();
746         if (sequenceNumber == OC_OBSERVE_DEREGISTER)
747         {
748             return OC_STACK_DELETE_TRANSACTION;
749         }
750         return OC_STACK_KEEP_TRANSACTION;
751     }
752
753     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
754         const OCDevAddr& devAddr,
755         const std::string& uri,
756         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
757         ObserveCallback& callback, QualityOfService QoS)
758     {
759         if (!callback)
760         {
761             return OC_STACK_INVALID_PARAM;
762         }
763         OCStackResult result;
764
765         ClientCallbackContext::ObserveContext* ctx =
766             new ClientCallbackContext::ObserveContext(callback);
767         OCCallbackData cbdata(
768                 static_cast<void*>(ctx),
769                 observeResourceCallback,
770                 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
771                 );
772
773         OCMethod method;
774         if (observeType == ObserveType::Observe)
775         {
776             method = OC_REST_OBSERVE;
777         }
778         else if (observeType == ObserveType::ObserveAll)
779         {
780             method = OC_REST_OBSERVE_ALL;
781         }
782         else
783         {
784             method = OC_REST_OBSERVE_ALL;
785         }
786
787         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
788
789         auto cLock = m_csdkLock.lock();
790
791         if (cLock)
792         {
793             std::lock_guard<std::recursive_mutex> lock(*cLock);
794             OCHeaderOption options[MAX_HEADER_OPTIONS];
795
796             result = OCDoResource(handle, method,
797                                   url.c_str(), &devAddr,
798                                   nullptr,
799                                   CT_DEFAULT,
800                                   static_cast<OCQualityOfService>(QoS),
801                                   &cbdata,
802                                   assembleHeaderOptions(options, headerOptions),
803                                   headerOptions.size());
804         }
805         else
806         {
807             delete ctx;
808             return OC_STACK_ERROR;
809         }
810
811         return result;
812     }
813
814     OCStackResult InProcClientWrapper::CancelObserveResource(
815             OCDoHandle handle,
816             const std::string& /*host*/,
817             const std::string& /*uri*/,
818             const HeaderOptions& headerOptions,
819             QualityOfService QoS)
820     {
821         OCStackResult result;
822         auto cLock = m_csdkLock.lock();
823
824         if (cLock)
825         {
826             std::lock_guard<std::recursive_mutex> lock(*cLock);
827             OCHeaderOption options[MAX_HEADER_OPTIONS];
828
829             result = OCCancel(handle,
830                     static_cast<OCQualityOfService>(QoS),
831                     assembleHeaderOptions(options, headerOptions),
832                     headerOptions.size());
833         }
834         else
835         {
836             result = OC_STACK_ERROR;
837         }
838
839         return result;
840     }
841
842     OCStackApplicationResult subscribePresenceCallback(void* ctx,
843                                                        OCDoHandle /*handle*/,
844             OCClientResponse* clientResponse)
845     {
846         ClientCallbackContext::SubscribePresenceContext* context =
847         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
848
849         /*
850          * This a hack while we rethink presence subscription.
851          */
852         std::string url = clientResponse->devAddr.addr;
853
854         std::thread exec(context->callback, clientResponse->result,
855                     clientResponse->sequenceNumber, url);
856
857         exec.detach();
858
859         return OC_STACK_KEEP_TRANSACTION;
860     }
861
862     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
863         const std::string& host, const std::string& resourceType,
864         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
865     {
866         if (!presenceHandler)
867         {
868             return OC_STACK_INVALID_PARAM;
869         }
870
871         ClientCallbackContext::SubscribePresenceContext* ctx =
872             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
873         OCCallbackData cbdata(
874                 static_cast<void*>(ctx),
875                 subscribePresenceCallback,
876                 [](void* c)
877                 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
878                 );
879
880         auto cLock = m_csdkLock.lock();
881
882         std::ostringstream os;
883         os << host << OC_RSRVD_PRESENCE_URI;
884
885         if (!resourceType.empty())
886         {
887             os << "?rt=" << resourceType;
888         }
889
890         if (!cLock)
891         {
892             delete ctx;
893             return OC_STACK_ERROR;
894         }
895
896         return OCDoResource(handle, OC_REST_PRESENCE,
897                             os.str().c_str(), nullptr,
898                             nullptr, connectivityType,
899                             OC_LOW_QOS, &cbdata, NULL, 0);
900     }
901
902     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
903     {
904         OCStackResult result;
905         auto cLock = m_csdkLock.lock();
906
907         if (cLock)
908         {
909             std::lock_guard<std::recursive_mutex> lock(*cLock);
910             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
911         }
912         else
913         {
914             result = OC_STACK_ERROR;
915         }
916
917         return result;
918     }
919
920     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
921     {
922         qos = m_cfg.QoS;
923         return OC_STACK_OK;
924     }
925
926     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
927            const HeaderOptions& headerOptions)
928     {
929         int i = 0;
930
931         if ( headerOptions.size() == 0)
932         {
933             return nullptr;
934         }
935
936         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
937         {
938             options[i] = OCHeaderOption(OC_COAP_ID,
939                     it->getOptionID(),
940                     it->getOptionData().length() + 1,
941                     reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
942             i++;
943         }
944
945         return options;
946     }
947
948     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
949     {
950         if (!dev)
951         {
952             return nullptr;
953         }
954
955         OCDPDev_t* result = new OCDPDev_t(*dev);
956         result->prm = new OCPrm_t[dev->prmLen];
957         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
958         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
959     }
960
961     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
962     {
963         while(list)
964         {
965             dpList.push_back(cloneDevice(list));
966             list = list->next;
967         }
968     }
969
970     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
971             GetDirectPairedCallback& callback)
972     {
973         if (!callback || 0 == waittime)
974         {
975             return OC_STACK_INVALID_PARAM;
976         }
977
978         OCStackResult result = OC_STACK_ERROR;
979         const OCDPDev_t *list = nullptr;
980         PairedDevices dpDeviceList;
981
982         auto cLock = m_csdkLock.lock();
983
984         if (cLock)
985         {
986             std::lock_guard<std::recursive_mutex> lock(*cLock);
987
988             list = OCDiscoverDirectPairingDevices(waittime);
989             if (NULL == list)
990             {
991                 result = OC_STACK_NO_RESOURCE;
992                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
993                     << std::flush;
994             }
995             else {
996                 convert(list, dpDeviceList);
997                 std::thread exec(callback, dpDeviceList);
998                 exec.detach();
999                 result = OC_STACK_OK;
1000             }
1001         }
1002         else
1003         {
1004             result = OC_STACK_ERROR;
1005         }
1006
1007         return result;
1008     }
1009
1010     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1011     {
1012         if (!callback)
1013         {
1014             return OC_STACK_INVALID_PARAM;
1015         }
1016
1017         OCStackResult result = OC_STACK_ERROR;
1018         const OCDPDev_t *list = nullptr;
1019         PairedDevices dpDeviceList;
1020
1021         auto cLock = m_csdkLock.lock();
1022
1023         if (cLock)
1024         {
1025             std::lock_guard<std::recursive_mutex> lock(*cLock);
1026
1027             list = OCGetDirectPairedDevices();
1028             if (NULL == list)
1029             {
1030                 result = OC_STACK_NO_RESOURCE;
1031                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1032                     << std::flush;
1033             }
1034             else {
1035                 convert(list, dpDeviceList);
1036                 std::thread exec(callback, dpDeviceList);
1037                 exec.detach();
1038                 result = OC_STACK_OK;
1039             }
1040         }
1041         else
1042         {
1043             result = OC_STACK_ERROR;
1044         }
1045
1046         return result;
1047     }
1048
1049     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1050             OCStackResult result)
1051     {
1052
1053         ClientCallbackContext::DirectPairingContext* context =
1054             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1055
1056         std::thread exec(context->callback, cloneDevice(peer), result);
1057         exec.detach();
1058     }
1059
1060     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1061             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1062     {
1063         if (!peer || !callback)
1064         {
1065             oclog() << "Invalid parameters" << std::flush;
1066             return OC_STACK_INVALID_PARAM;
1067         }
1068
1069         OCStackResult result = OC_STACK_ERROR;
1070         ClientCallbackContext::DirectPairingContext* context =
1071             new ClientCallbackContext::DirectPairingContext(callback);
1072
1073         auto cLock = m_csdkLock.lock();
1074         if (cLock)
1075         {
1076             std::lock_guard<std::recursive_mutex> lock(*cLock);
1077             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1078                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1079         }
1080         else
1081         {
1082             delete context;
1083             result = OC_STACK_ERROR;
1084         }
1085         return result;
1086     }
1087 }