Merge remote-tracking branch 'origin/extended-easysetup'
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try{
169             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171             // loop to ensure valid construction of all resources
172
173             for(auto resource : container.Resources())
174             {
175                 std::thread exec(context->callback, resource);
176                 exec.detach();
177             }
178         }
179         catch (std::exception &e){
180             oclog() << "Exception in listCallback, ignoring response: "
181                     << e.what() << std::flush;
182         }
183
184
185         return OC_STACK_KEEP_TRANSACTION;
186     }
187
188     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
189         OCClientResponse* clientResponse)
190     {
191         if (!ctx || !clientResponse)
192         {
193             return OC_STACK_KEEP_TRANSACTION;
194         }
195
196         ClientCallbackContext::ListenErrorContext* context =
197             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
198         if (!context)
199         {
200             return OC_STACK_KEEP_TRANSACTION;
201         }
202
203         OCStackResult result = clientResponse->result;
204         if (result == OC_STACK_OK)
205         {
206             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
207             {
208                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
209                     << std::flush;
210                 return OC_STACK_KEEP_TRANSACTION;
211             }
212
213             auto clientWrapper = context->clientWrapper.lock();
214
215             if (!clientWrapper)
216             {
217                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
218                         << std::flush;
219                 return OC_STACK_KEEP_TRANSACTION;
220             }
221
222             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
223                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
224             // loop to ensure valid construction of all resources
225             for (auto resource : container.Resources())
226             {
227                 std::thread exec(context->callback, resource);
228                 exec.detach();
229             }
230             return OC_STACK_KEEP_TRANSACTION;
231         }
232
233         std::string resourceURI = clientResponse->resourceUri;
234         std::thread exec(context->errorCallback, resourceURI, result);
235         exec.detach();
236         return OC_STACK_DELETE_TRANSACTION;
237     }
238
239     OCStackResult InProcClientWrapper::ListenForResource(
240             const std::string& serviceUrl,
241             const std::string& resourceType,
242             OCConnectivityType connectivityType,
243             FindCallback& callback, QualityOfService QoS)
244     {
245         if (!callback)
246         {
247             return OC_STACK_INVALID_PARAM;
248         }
249
250         OCStackResult result;
251         ostringstream resourceUri;
252         resourceUri << serviceUrl << resourceType;
253
254         ClientCallbackContext::ListenContext* context =
255             new ClientCallbackContext::ListenContext(callback, shared_from_this());
256         OCCallbackData cbdata;
257         cbdata.context = static_cast<void*>(context),
258         cbdata.cb      = listenCallback;
259         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
260
261         auto cLock = m_csdkLock.lock();
262         if (cLock)
263         {
264             std::lock_guard<std::recursive_mutex> lock(*cLock);
265             result = OCDoResource(nullptr, OC_REST_DISCOVER,
266                                   resourceUri.str().c_str(),
267                                   nullptr, nullptr, connectivityType,
268                                   static_cast<OCQualityOfService>(QoS),
269                                   &cbdata,
270                                   nullptr, 0);
271         }
272         else
273         {
274             delete context;
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280     OCStackResult InProcClientWrapper::ListenErrorForResource(
281             const std::string& serviceUrl,
282             const std::string& resourceType,
283             OCConnectivityType connectivityType,
284             FindCallback& callback, FindErrorCallback& errorCallback,
285             QualityOfService QoS)
286     {
287         if (!callback)
288         {
289             return OC_STACK_INVALID_PARAM;
290         }
291
292         ostringstream resourceUri;
293         resourceUri << serviceUrl << resourceType;
294
295         ClientCallbackContext::ListenErrorContext* context =
296             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
297                                                           shared_from_this());
298         if (!context)
299         {
300             return OC_STACK_ERROR;
301         }
302
303         OCCallbackData cbdata(
304                 static_cast<void*>(context),
305                 listenErrorCallback,
306                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
307             );
308
309         OCStackResult result;
310         auto cLock = m_csdkLock.lock();
311         if (cLock)
312         {
313             std::lock_guard<std::recursive_mutex> lock(*cLock);
314             result = OCDoResource(nullptr, OC_REST_DISCOVER,
315                                   resourceUri.str().c_str(),
316                                   nullptr, nullptr, connectivityType,
317                                   static_cast<OCQualityOfService>(QoS),
318                                   &cbdata,
319                                   nullptr, 0);
320         }
321         else
322         {
323             delete context;
324             result = OC_STACK_ERROR;
325         }
326         return result;
327     }
328 #ifdef WITH_MQ
329     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
330             OCClientResponse* clientResponse)
331     {
332         ClientCallbackContext::ListenContext* context =
333             static_cast<ClientCallbackContext::ListenContext*>(ctx);
334
335         if (!clientResponse)
336         {
337             return OC_STACK_KEEP_TRANSACTION;
338         }
339
340         if (clientResponse->result != OC_STACK_OK)
341         {
342             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
343                     << clientResponse->result
344                     << std::flush;
345
346             return OC_STACK_KEEP_TRANSACTION;
347         }
348
349         auto clientWrapper = context->clientWrapper.lock();
350         if (!clientWrapper)
351         {
352             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
353                     << std::flush;
354             return OC_STACK_KEEP_TRANSACTION;
355         }
356
357         try{
358             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
359                                         (OCRepPayload *) clientResponse->payload);
360
361             // loop to ensure valid construction of all resources
362             for (auto resource : container.Resources())
363             {
364                 std::thread exec(context->callback, resource);
365                 exec.detach();
366             }
367         }
368         catch (std::exception &e){
369             oclog() << "Exception in listCallback, ignoring response: "
370                     << e.what() << std::flush;
371         }
372
373
374         return OC_STACK_KEEP_TRANSACTION;
375     }
376
377     OCStackResult InProcClientWrapper::ListenForMQTopic(
378             const OCDevAddr& devAddr,
379             const std::string& resourceUri,
380             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
381             FindCallback& callback, QualityOfService QoS)
382     {
383         oclog() << "ListenForMQTopic()" << std::flush;
384
385         if (!callback)
386         {
387             return OC_STACK_INVALID_PARAM;
388         }
389
390         ClientCallbackContext::ListenContext* context =
391             new ClientCallbackContext::ListenContext(callback, shared_from_this());
392         OCCallbackData cbdata;
393         cbdata.context = static_cast<void*>(context),
394         cbdata.cb      = listenMQCallback;
395         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
396
397         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
398
399         OCStackResult result = OC_STACK_ERROR;
400         auto cLock = m_csdkLock.lock();
401         if (cLock)
402         {
403             std::lock_guard<std::recursive_mutex> lock(*cLock);
404             OCHeaderOption options[MAX_HEADER_OPTIONS];
405             result = OCDoResource(
406                                   nullptr, OC_REST_GET,
407                                   uri.c_str(),
408                                   &devAddr, nullptr,
409                                   CT_DEFAULT,
410                                   static_cast<OCQualityOfService>(QoS),
411                                   &cbdata,
412                                   assembleHeaderOptions(options, headerOptions),
413                                   headerOptions.size());
414         }
415         else
416         {
417             delete context;
418         }
419
420         return result;
421     }
422 #endif
423
424     OCStackApplicationResult listenDeviceCallback(void* ctx,
425                                                   OCDoHandle /*handle*/,
426             OCClientResponse* clientResponse)
427     {
428         ClientCallbackContext::DeviceListenContext* context =
429             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
430
431         try
432         {
433             OCRepresentation rep = parseGetSetCallback(clientResponse);
434             std::thread exec(context->callback, rep);
435             exec.detach();
436         }
437         catch(OC::OCException& e)
438         {
439             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
440                 <<e.what() <<std::flush;
441         }
442
443         return OC_STACK_KEEP_TRANSACTION;
444     }
445
446     OCStackResult InProcClientWrapper::ListenForDevice(
447             const std::string& serviceUrl,
448             const std::string& deviceURI,
449             OCConnectivityType connectivityType,
450             FindDeviceCallback& callback,
451             QualityOfService QoS)
452     {
453         if (!callback)
454         {
455             return OC_STACK_INVALID_PARAM;
456         }
457         OCStackResult result;
458         ostringstream deviceUri;
459         deviceUri << serviceUrl << deviceURI;
460
461         ClientCallbackContext::DeviceListenContext* context =
462             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
463         OCCallbackData cbdata;
464
465         cbdata.context = static_cast<void*>(context),
466         cbdata.cb      = listenDeviceCallback;
467         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
468
469         auto cLock = m_csdkLock.lock();
470         if (cLock)
471         {
472             std::lock_guard<std::recursive_mutex> lock(*cLock);
473             result = OCDoResource(nullptr, OC_REST_DISCOVER,
474                                   deviceUri.str().c_str(),
475                                   nullptr, nullptr, connectivityType,
476                                   static_cast<OCQualityOfService>(QoS),
477                                   &cbdata,
478                                   nullptr, 0);
479         }
480         else
481         {
482             delete context;
483             result = OC_STACK_ERROR;
484         }
485         return result;
486     }
487
488     void parseServerHeaderOptions(OCClientResponse* clientResponse,
489                     HeaderOptions& serverHeaderOptions)
490     {
491         if (clientResponse)
492         {
493             // Parse header options from server
494             uint16_t optionID;
495             std::string optionData;
496
497             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
498             {
499                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
500                 optionData = reinterpret_cast<const char*>
501                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
502                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
503                 serverHeaderOptions.push_back(headerOption);
504             }
505         }
506         else
507         {
508             // clientResponse is invalid
509             // TODO check proper logging
510             std::cout << " Invalid response " << std::endl;
511         }
512     }
513
514 #ifdef WITH_MQ
515     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
516                     OCClientResponse* clientResponse)
517     {
518         ClientCallbackContext::CreateMQTopicContext* context =
519             static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
520         OCRepresentation rep;
521         HeaderOptions serverHeaderOptions;
522
523         if (!clientResponse)
524         {
525             return OC_STACK_DELETE_TRANSACTION;
526         }
527
528         std::string createdUri;
529         OCStackResult result = clientResponse->result;
530         if (OC_STACK_OK               == result ||
531             OC_STACK_RESOURCE_CREATED == result)
532         {
533             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
534             try
535             {
536                 rep = parseGetSetCallback(clientResponse);
537             }
538             catch(OC::OCException& e)
539             {
540                 result = e.code();
541             }
542
543             bool isLocationOption = false;
544             for (auto headerOption : serverHeaderOptions)
545             {
546                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
547                 {
548                     createdUri += "/";
549                     createdUri += headerOption.getOptionData();
550                     if (!isLocationOption)
551                     {
552                         isLocationOption = true;
553                     }
554                 }
555             }
556
557             if (!isLocationOption)
558             {
559                 createdUri = clientResponse->resourceUri;
560             }
561         }
562
563         auto clientWrapper = context->clientWrapper.lock();
564
565         if (!clientWrapper)
566         {
567             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
568                     << std::flush;
569             return OC_STACK_DELETE_TRANSACTION;
570         }
571
572         try{
573             if (OC_STACK_OK               == result ||
574                 OC_STACK_RESOURCE_CREATED == result)
575             {
576                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
577                                             createdUri);
578                 for (auto resource : container.Resources())
579                 {
580                     std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
581                     exec.detach();
582                 }
583             }
584             else
585             {
586                 std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
587                 exec.detach();
588             }
589         }
590         catch (std::exception &e){
591             oclog() << "Exception in createMQTopicCallback, ignoring response: "
592                     << e.what() << std::flush;
593         }
594         return OC_STACK_DELETE_TRANSACTION;
595     }
596
597     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
598                 const OCDevAddr& devAddr,
599                 const std::string& uri,
600                 const OCRepresentation& rep,
601                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
602                 MQCreateTopicCallback& callback, QualityOfService QoS)
603     {
604         if (!callback)
605         {
606             return OC_STACK_INVALID_PARAM;
607         }
608         OCStackResult result;
609         ClientCallbackContext::CreateMQTopicContext* ctx =
610                 new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
611         OCCallbackData cbdata;
612         cbdata.context = static_cast<void*>(ctx),
613         cbdata.cb      = createMQTopicCallback;
614         cbdata.cd      = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
615
616         std::string url = assembleSetResourceUri(uri, queryParams);
617
618         auto cLock = m_csdkLock.lock();
619
620         if (cLock)
621         {
622             std::lock_guard<std::recursive_mutex> lock(*cLock);
623             OCHeaderOption options[MAX_HEADER_OPTIONS];
624
625             result = OCDoResource(nullptr, OC_REST_PUT,
626                                   url.c_str(), &devAddr,
627                                   assembleSetResourcePayload(rep),
628                                   CT_DEFAULT,
629                                   static_cast<OCQualityOfService>(QoS),
630                                   &cbdata,
631                                   assembleHeaderOptions(options, headerOptions),
632                                   headerOptions.size());
633         }
634         else
635         {
636             delete ctx;
637             result = OC_STACK_ERROR;
638         }
639
640         return result;
641     }
642 #endif
643     OCStackApplicationResult getResourceCallback(void* ctx,
644                                                  OCDoHandle /*handle*/,
645         OCClientResponse* clientResponse)
646     {
647         ClientCallbackContext::GetContext* context =
648             static_cast<ClientCallbackContext::GetContext*>(ctx);
649
650         OCRepresentation rep;
651         HeaderOptions serverHeaderOptions;
652         OCStackResult result = clientResponse->result;
653         if (result == OC_STACK_OK)
654         {
655             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
656             try
657             {
658                 rep = parseGetSetCallback(clientResponse);
659             }
660             catch(OC::OCException& e)
661             {
662                 result = e.code();
663             }
664         }
665
666         std::thread exec(context->callback, serverHeaderOptions, rep, result);
667         exec.detach();
668         return OC_STACK_DELETE_TRANSACTION;
669     }
670
671     OCStackResult InProcClientWrapper::GetResourceRepresentation(
672         const OCDevAddr& devAddr,
673         const std::string& resourceUri,
674         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
675         OCConnectivityType connectivityType,
676         GetCallback& callback, QualityOfService QoS)
677     {
678         if (!callback)
679         {
680             return OC_STACK_INVALID_PARAM;
681         }
682         OCStackResult result;
683         ClientCallbackContext::GetContext* ctx =
684             new ClientCallbackContext::GetContext(callback);
685         OCCallbackData cbdata;
686         cbdata.context = static_cast<void*>(ctx),
687         cbdata.cb      = getResourceCallback;
688         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
689
690
691         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
692
693         auto cLock = m_csdkLock.lock();
694
695         if (cLock)
696         {
697             std::lock_guard<std::recursive_mutex> lock(*cLock);
698             OCHeaderOption options[MAX_HEADER_OPTIONS];
699
700             result = OCDoResource(
701                                   nullptr, OC_REST_GET,
702                                   uri.c_str(),
703                                   &devAddr, nullptr,
704                                   connectivityType,
705                                   static_cast<OCQualityOfService>(QoS),
706                                   &cbdata,
707                                   assembleHeaderOptions(options, headerOptions),
708                                   headerOptions.size());
709         }
710         else
711         {
712             delete ctx;
713             result = OC_STACK_ERROR;
714         }
715         return result;
716     }
717
718
719     OCStackApplicationResult setResourceCallback(void* ctx,
720                                                  OCDoHandle /*handle*/,
721         OCClientResponse* clientResponse)
722     {
723         ClientCallbackContext::SetContext* context =
724             static_cast<ClientCallbackContext::SetContext*>(ctx);
725         OCRepresentation attrs;
726         HeaderOptions serverHeaderOptions;
727
728         OCStackResult result = clientResponse->result;
729         if (OC_STACK_OK               == result ||
730             OC_STACK_RESOURCE_CREATED == result ||
731             OC_STACK_RESOURCE_DELETED == result ||
732             OC_STACK_RESOURCE_CHANGED == result)
733         {
734             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
735             try
736             {
737                 attrs = parseGetSetCallback(clientResponse);
738             }
739             catch(OC::OCException& e)
740             {
741                 result = e.code();
742             }
743         }
744
745         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
746         exec.detach();
747         return OC_STACK_DELETE_TRANSACTION;
748     }
749
750     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
751         const QueryParamsMap& queryParams)
752     {
753         if (!uri.empty())
754         {
755             if (uri.back() == '/')
756             {
757                 uri.resize(uri.size() - 1);
758             }
759         }
760
761         ostringstream paramsList;
762         if (queryParams.size() > 0)
763         {
764             paramsList << '?';
765         }
766
767         for (auto& param : queryParams)
768         {
769             paramsList << param.first <<'='<<param.second<<';';
770         }
771
772         std::string queryString = paramsList.str();
773
774         if (queryString.empty())
775         {
776             return uri;
777         }
778
779         if (queryString.back() == ';')
780         {
781             queryString.resize(queryString.size() - 1);
782         }
783
784         std::string ret = uri + queryString;
785         return ret;
786     }
787
788     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
789         const QueryParamsList& queryParams)
790     {
791         if (!uri.empty())
792         {
793             if (uri.back() == '/')
794             {
795                 uri.resize(uri.size() - 1);
796             }
797         }
798
799         ostringstream paramsList;
800         if (queryParams.size() > 0)
801         {
802             paramsList << '?';
803         }
804
805         for (auto& param : queryParams)
806         {
807             for (auto& paramList : param.second)
808             {
809                 paramsList << param.first << '=' << paramList;
810                 if (paramList != param.second.back())
811                 {
812                     paramsList << '&';
813                 }
814             }
815             paramsList << ';';
816         }
817
818         std::string queryString = paramsList.str();
819
820         if (queryString.empty())
821         {
822             return uri;
823         }
824
825         if (queryString.back() == ';')
826         {
827             queryString.resize(queryString.size() - 1);
828         }
829
830         std::string ret = uri + queryString;
831         return ret;
832     }
833
834     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
835     {
836         MessageContainer ocInfo;
837         ocInfo.addRepresentation(rep);
838         for(const OCRepresentation& r : rep.getChildren())
839         {
840             ocInfo.addRepresentation(r);
841         }
842
843         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
844     }
845
846     OCStackResult InProcClientWrapper::PostResourceRepresentation(
847         const OCDevAddr& devAddr,
848         const std::string& uri,
849         const OCRepresentation& rep,
850         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
851         OCConnectivityType connectivityType,
852         PostCallback& callback, QualityOfService QoS)
853     {
854         if (!callback)
855         {
856             return OC_STACK_INVALID_PARAM;
857         }
858         OCStackResult result;
859         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
860         OCCallbackData cbdata;
861         cbdata.context = static_cast<void*>(ctx),
862         cbdata.cb      = setResourceCallback;
863         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
864
865
866         std::string url = assembleSetResourceUri(uri, queryParams);
867
868         auto cLock = m_csdkLock.lock();
869
870         if (cLock)
871         {
872             std::lock_guard<std::recursive_mutex> lock(*cLock);
873             OCHeaderOption options[MAX_HEADER_OPTIONS];
874
875             result = OCDoResource(nullptr, OC_REST_POST,
876                                   url.c_str(), &devAddr,
877                                   assembleSetResourcePayload(rep),
878                                   connectivityType,
879                                   static_cast<OCQualityOfService>(QoS),
880                                   &cbdata,
881                                   assembleHeaderOptions(options, headerOptions),
882                                   headerOptions.size());
883         }
884         else
885         {
886             delete ctx;
887             result = OC_STACK_ERROR;
888         }
889
890         return result;
891     }
892
893     OCStackResult InProcClientWrapper::PutResourceRepresentation(
894         const OCDevAddr& devAddr,
895         const std::string& uri,
896         const OCRepresentation& rep,
897         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
898         PutCallback& callback, QualityOfService QoS)
899     {
900         if (!callback)
901         {
902             return OC_STACK_INVALID_PARAM;
903         }
904         OCStackResult result;
905         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
906         OCCallbackData cbdata;
907         cbdata.context = static_cast<void*>(ctx),
908         cbdata.cb      = setResourceCallback;
909         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
910
911
912         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
913
914         auto cLock = m_csdkLock.lock();
915
916         if (cLock)
917         {
918             std::lock_guard<std::recursive_mutex> lock(*cLock);
919             OCDoHandle handle;
920             OCHeaderOption options[MAX_HEADER_OPTIONS];
921
922             result = OCDoResource(&handle, OC_REST_PUT,
923                                   url.c_str(), &devAddr,
924                                   assembleSetResourcePayload(rep),
925                                   CT_DEFAULT,
926                                   static_cast<OCQualityOfService>(QoS),
927                                   &cbdata,
928                                   assembleHeaderOptions(options, headerOptions),
929                                   headerOptions.size());
930         }
931         else
932         {
933             delete ctx;
934             result = OC_STACK_ERROR;
935         }
936
937         return result;
938     }
939
940     OCStackApplicationResult deleteResourceCallback(void* ctx,
941                                                     OCDoHandle /*handle*/,
942         OCClientResponse* clientResponse)
943     {
944         ClientCallbackContext::DeleteContext* context =
945             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
946         HeaderOptions serverHeaderOptions;
947
948         if (clientResponse->result == OC_STACK_OK)
949         {
950             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
951         }
952         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
953         exec.detach();
954         return OC_STACK_DELETE_TRANSACTION;
955     }
956
957     OCStackResult InProcClientWrapper::DeleteResource(
958         const OCDevAddr& devAddr,
959         const std::string& uri,
960         const HeaderOptions& headerOptions,
961         OCConnectivityType connectivityType,
962         DeleteCallback& callback,
963         QualityOfService /*QoS*/)
964     {
965         if (!callback)
966         {
967             return OC_STACK_INVALID_PARAM;
968         }
969         OCStackResult result;
970         ClientCallbackContext::DeleteContext* ctx =
971             new ClientCallbackContext::DeleteContext(callback);
972         OCCallbackData cbdata;
973         cbdata.context = static_cast<void*>(ctx),
974         cbdata.cb      = deleteResourceCallback;
975         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
976
977
978         auto cLock = m_csdkLock.lock();
979
980         if (cLock)
981         {
982             OCHeaderOption options[MAX_HEADER_OPTIONS];
983
984             std::lock_guard<std::recursive_mutex> lock(*cLock);
985
986             result = OCDoResource(nullptr, OC_REST_DELETE,
987                                   uri.c_str(), &devAddr,
988                                   nullptr,
989                                   connectivityType,
990                                   static_cast<OCQualityOfService>(m_cfg.QoS),
991                                   &cbdata,
992                                   assembleHeaderOptions(options, headerOptions),
993                                   headerOptions.size());
994         }
995         else
996         {
997             delete ctx;
998             result = OC_STACK_ERROR;
999         }
1000
1001         return result;
1002     }
1003
1004     OCStackApplicationResult observeResourceCallback(void* ctx,
1005                                                      OCDoHandle /*handle*/,
1006         OCClientResponse* clientResponse)
1007     {
1008         ClientCallbackContext::ObserveContext* context =
1009             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1010         OCRepresentation attrs;
1011         HeaderOptions serverHeaderOptions;
1012         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1013         OCStackResult result = clientResponse->result;
1014         if (clientResponse->result == OC_STACK_OK)
1015         {
1016             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1017             try
1018             {
1019                 attrs = parseGetSetCallback(clientResponse);
1020             }
1021             catch(OC::OCException& e)
1022             {
1023                 result = e.code();
1024             }
1025         }
1026         std::thread exec(context->callback, serverHeaderOptions, attrs,
1027                     result, sequenceNumber);
1028         exec.detach();
1029
1030         return OC_STACK_KEEP_TRANSACTION;
1031     }
1032
1033     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1034         const OCDevAddr& devAddr,
1035         const std::string& uri,
1036         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1037         ObserveCallback& callback, QualityOfService QoS)
1038     {
1039         if (!callback)
1040         {
1041             return OC_STACK_INVALID_PARAM;
1042         }
1043         OCStackResult result;
1044
1045         ClientCallbackContext::ObserveContext* ctx =
1046             new ClientCallbackContext::ObserveContext(callback);
1047         OCCallbackData cbdata;
1048         cbdata.context = static_cast<void*>(ctx),
1049         cbdata.cb      = observeResourceCallback;
1050         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1051
1052
1053         OCMethod method;
1054         if (observeType == ObserveType::Observe)
1055         {
1056             method = OC_REST_OBSERVE;
1057         }
1058         else if (observeType == ObserveType::ObserveAll)
1059         {
1060             method = OC_REST_OBSERVE_ALL;
1061         }
1062         else
1063         {
1064             method = OC_REST_OBSERVE_ALL;
1065         }
1066
1067         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1068
1069         auto cLock = m_csdkLock.lock();
1070
1071         if (cLock)
1072         {
1073             std::lock_guard<std::recursive_mutex> lock(*cLock);
1074             OCHeaderOption options[MAX_HEADER_OPTIONS];
1075
1076             result = OCDoResource(handle, method,
1077                                   url.c_str(), &devAddr,
1078                                   nullptr,
1079                                   CT_DEFAULT,
1080                                   static_cast<OCQualityOfService>(QoS),
1081                                   &cbdata,
1082                                   assembleHeaderOptions(options, headerOptions),
1083                                   headerOptions.size());
1084         }
1085         else
1086         {
1087             delete ctx;
1088             return OC_STACK_ERROR;
1089         }
1090
1091         return result;
1092     }
1093
1094     OCStackResult InProcClientWrapper::CancelObserveResource(
1095             OCDoHandle handle,
1096             const std::string& /*host*/,
1097             const std::string& /*uri*/,
1098             const HeaderOptions& headerOptions,
1099             QualityOfService QoS)
1100     {
1101         OCStackResult result;
1102         auto cLock = m_csdkLock.lock();
1103
1104         if (cLock)
1105         {
1106             std::lock_guard<std::recursive_mutex> lock(*cLock);
1107             OCHeaderOption options[MAX_HEADER_OPTIONS];
1108
1109             result = OCCancel(handle,
1110                     static_cast<OCQualityOfService>(QoS),
1111                     assembleHeaderOptions(options, headerOptions),
1112                     headerOptions.size());
1113         }
1114         else
1115         {
1116             result = OC_STACK_ERROR;
1117         }
1118
1119         return result;
1120     }
1121
1122     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1123                                                        OCDoHandle /*handle*/,
1124             OCClientResponse* clientResponse)
1125     {
1126         ClientCallbackContext::SubscribePresenceContext* context =
1127         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1128
1129         /*
1130          * This a hack while we rethink presence subscription.
1131          */
1132         std::string url = clientResponse->devAddr.addr;
1133
1134         std::thread exec(context->callback, clientResponse->result,
1135                     clientResponse->sequenceNumber, url);
1136
1137         exec.detach();
1138
1139         return OC_STACK_KEEP_TRANSACTION;
1140     }
1141
1142     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1143         const std::string& host, const std::string& resourceType,
1144         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1145     {
1146         if (!presenceHandler)
1147         {
1148             return OC_STACK_INVALID_PARAM;
1149         }
1150
1151         ClientCallbackContext::SubscribePresenceContext* ctx =
1152             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1153         OCCallbackData cbdata;
1154         cbdata.context = static_cast<void*>(ctx),
1155         cbdata.cb      = subscribePresenceCallback;
1156         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1157
1158
1159         auto cLock = m_csdkLock.lock();
1160
1161         std::ostringstream os;
1162         os << host << OC_RSRVD_PRESENCE_URI;
1163
1164         if (!resourceType.empty())
1165         {
1166             os << "?rt=" << resourceType;
1167         }
1168
1169         if (!cLock)
1170         {
1171             delete ctx;
1172             return OC_STACK_ERROR;
1173         }
1174
1175         return OCDoResource(handle, OC_REST_PRESENCE,
1176                             os.str().c_str(), nullptr,
1177                             nullptr, connectivityType,
1178                             OC_LOW_QOS, &cbdata, NULL, 0);
1179     }
1180
1181     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1182     {
1183         OCStackResult result;
1184         auto cLock = m_csdkLock.lock();
1185
1186         if (cLock)
1187         {
1188             std::lock_guard<std::recursive_mutex> lock(*cLock);
1189             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1190         }
1191         else
1192         {
1193             result = OC_STACK_ERROR;
1194         }
1195
1196         return result;
1197     }
1198
1199 #ifdef WITH_CLOUD
1200     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1201                                                                const std::string& host,
1202                                                                const std::vector<std::string>& di,
1203                                                                OCConnectivityType connectivityType,
1204                                                                ObserveCallback& callback)
1205     {
1206         if (!callback)
1207         {
1208             return OC_STACK_INVALID_PARAM;
1209         }
1210         OCStackResult result;
1211
1212         ClientCallbackContext::ObserveContext* ctx =
1213             new ClientCallbackContext::ObserveContext(callback);
1214         OCCallbackData cbdata;
1215         cbdata.context = static_cast<void*>(ctx),
1216         cbdata.cb      = observeResourceCallback;
1217         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1218
1219         auto cLock = m_csdkLock.lock();
1220
1221         if (cLock)
1222         {
1223             std::lock_guard<std::recursive_mutex> lock(*cLock);
1224
1225             std::ostringstream os;
1226             os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
1227             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1228             std::string url = assembleSetResourceUri(os.str(), queryParams);
1229
1230             result = OCDoResource(handle, OC_REST_OBSERVE,
1231                                   url.c_str(), nullptr,
1232                                   nullptr, connectivityType,
1233                                   OC_LOW_QOS, &cbdata,
1234                                   nullptr, 0);
1235         }
1236         else
1237         {
1238             delete ctx;
1239             result = OC_STACK_ERROR;
1240         }
1241
1242         return result;
1243     }
1244 #endif
1245
1246     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1247     {
1248         qos = m_cfg.QoS;
1249         return OC_STACK_OK;
1250     }
1251
1252     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1253            const HeaderOptions& headerOptions)
1254     {
1255         int i = 0;
1256
1257         if ( headerOptions.size() == 0)
1258         {
1259             return nullptr;
1260         }
1261
1262         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1263         {
1264             options[i] = OCHeaderOption();
1265             options[i].protocolID = OC_COAP_ID;
1266             options[i].optionID = it->getOptionID();
1267             options[i].optionLength = it->getOptionData().length() + 1;
1268             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1269             i++;
1270         }
1271
1272         return options;
1273     }
1274
1275     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1276     {
1277         if (!dev)
1278         {
1279             return nullptr;
1280         }
1281
1282         OCDPDev_t* result = new OCDPDev_t(*dev);
1283         result->prm = new OCPrm_t[dev->prmLen];
1284         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1285         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1286     }
1287
1288     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1289     {
1290         while(list)
1291         {
1292             dpList.push_back(cloneDevice(list));
1293             list = list->next;
1294         }
1295     }
1296
1297     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1298             GetDirectPairedCallback& callback)
1299     {
1300         if (!callback || 0 == waittime)
1301         {
1302             return OC_STACK_INVALID_PARAM;
1303         }
1304
1305         OCStackResult result = OC_STACK_ERROR;
1306         const OCDPDev_t *list = nullptr;
1307         PairedDevices dpDeviceList;
1308
1309         auto cLock = m_csdkLock.lock();
1310
1311         if (cLock)
1312         {
1313             std::lock_guard<std::recursive_mutex> lock(*cLock);
1314
1315             list = OCDiscoverDirectPairingDevices(waittime);
1316             if (NULL == list)
1317             {
1318                 result = OC_STACK_NO_RESOURCE;
1319                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1320                     << std::flush;
1321             }
1322             else {
1323                 convert(list, dpDeviceList);
1324                 std::thread exec(callback, dpDeviceList);
1325                 exec.detach();
1326                 result = OC_STACK_OK;
1327             }
1328         }
1329         else
1330         {
1331             result = OC_STACK_ERROR;
1332         }
1333
1334         return result;
1335     }
1336
1337     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1338     {
1339         if (!callback)
1340         {
1341             return OC_STACK_INVALID_PARAM;
1342         }
1343
1344         OCStackResult result = OC_STACK_ERROR;
1345         const OCDPDev_t *list = nullptr;
1346         PairedDevices dpDeviceList;
1347
1348         auto cLock = m_csdkLock.lock();
1349
1350         if (cLock)
1351         {
1352             std::lock_guard<std::recursive_mutex> lock(*cLock);
1353
1354             list = OCGetDirectPairedDevices();
1355             if (NULL == list)
1356             {
1357                 result = OC_STACK_NO_RESOURCE;
1358                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1359                     << std::flush;
1360             }
1361             else {
1362                 convert(list, dpDeviceList);
1363                 std::thread exec(callback, dpDeviceList);
1364                 exec.detach();
1365                 result = OC_STACK_OK;
1366             }
1367         }
1368         else
1369         {
1370             result = OC_STACK_ERROR;
1371         }
1372
1373         return result;
1374     }
1375
1376     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1377             OCStackResult result)
1378     {
1379
1380         ClientCallbackContext::DirectPairingContext* context =
1381             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1382
1383         std::thread exec(context->callback, cloneDevice(peer), result);
1384         exec.detach();
1385     }
1386
1387     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1388             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1389     {
1390         if (!peer || !callback)
1391         {
1392             oclog() << "Invalid parameters" << std::flush;
1393             return OC_STACK_INVALID_PARAM;
1394         }
1395
1396         OCStackResult result = OC_STACK_ERROR;
1397         ClientCallbackContext::DirectPairingContext* context =
1398             new ClientCallbackContext::DirectPairingContext(callback);
1399
1400         auto cLock = m_csdkLock.lock();
1401         if (cLock)
1402         {
1403             std::lock_guard<std::recursive_mutex> lock(*cLock);
1404             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1405                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1406         }
1407         else
1408         {
1409             delete context;
1410             result = OC_STACK_ERROR;
1411         }
1412         return result;
1413     }
1414 }