Merge branch 'master' into notification-service
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try{
169             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171             // loop to ensure valid construction of all resources
172
173             for(auto resource : container.Resources())
174             {
175                 std::thread exec(context->callback, resource);
176                 exec.detach();
177             }
178         }
179         catch (std::exception &e){
180             oclog() << "Exception in listCallback, ignoring response: "
181                     << e.what() << std::flush;
182         }
183
184
185         return OC_STACK_KEEP_TRANSACTION;
186     }
187
188     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
189         OCClientResponse* clientResponse)
190     {
191         if (!ctx || !clientResponse)
192         {
193             return OC_STACK_KEEP_TRANSACTION;
194         }
195
196         ClientCallbackContext::ListenErrorContext* context =
197             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
198         if (!context)
199         {
200             return OC_STACK_KEEP_TRANSACTION;
201         }
202
203         OCStackResult result = clientResponse->result;
204         if (result == OC_STACK_OK)
205         {
206             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
207             {
208                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
209                     << std::flush;
210                 return OC_STACK_KEEP_TRANSACTION;
211             }
212
213             auto clientWrapper = context->clientWrapper.lock();
214
215             if (!clientWrapper)
216             {
217                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
218                         << std::flush;
219                 return OC_STACK_KEEP_TRANSACTION;
220             }
221
222             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
223                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
224             // loop to ensure valid construction of all resources
225             for (auto resource : container.Resources())
226             {
227                 std::thread exec(context->callback, resource);
228                 exec.detach();
229             }
230             return OC_STACK_KEEP_TRANSACTION;
231         }
232
233         std::string resourceURI = clientResponse->resourceUri;
234         std::thread exec(context->errorCallback, resourceURI, result);
235         exec.detach();
236         return OC_STACK_DELETE_TRANSACTION;
237     }
238
239     OCStackResult InProcClientWrapper::ListenForResource(
240             const std::string& serviceUrl,
241             const std::string& resourceType,
242             OCConnectivityType connectivityType,
243             FindCallback& callback, QualityOfService QoS)
244     {
245         if (!callback)
246         {
247             return OC_STACK_INVALID_PARAM;
248         }
249
250         OCStackResult result;
251         ostringstream resourceUri;
252         resourceUri << serviceUrl << resourceType;
253
254         ClientCallbackContext::ListenContext* context =
255             new ClientCallbackContext::ListenContext(callback, shared_from_this());
256         OCCallbackData cbdata;
257         cbdata.context = static_cast<void*>(context),
258         cbdata.cb      = listenCallback;
259         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
260
261         auto cLock = m_csdkLock.lock();
262         if (cLock)
263         {
264             std::lock_guard<std::recursive_mutex> lock(*cLock);
265             result = OCDoResource(nullptr, OC_REST_DISCOVER,
266                                   resourceUri.str().c_str(),
267                                   nullptr, nullptr, connectivityType,
268                                   static_cast<OCQualityOfService>(QoS),
269                                   &cbdata,
270                                   nullptr, 0);
271         }
272         else
273         {
274             delete context;
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280     OCStackResult InProcClientWrapper::ListenErrorForResource(
281             const std::string& serviceUrl,
282             const std::string& resourceType,
283             OCConnectivityType connectivityType,
284             FindCallback& callback, FindErrorCallback& errorCallback,
285             QualityOfService QoS)
286     {
287         if (!callback)
288         {
289             return OC_STACK_INVALID_PARAM;
290         }
291
292         ostringstream resourceUri;
293         resourceUri << serviceUrl << resourceType;
294
295         ClientCallbackContext::ListenErrorContext* context =
296             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
297                                                           shared_from_this());
298         if (!context)
299         {
300             return OC_STACK_ERROR;
301         }
302
303         OCCallbackData cbdata(
304                 static_cast<void*>(context),
305                 listenErrorCallback,
306                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
307             );
308
309         OCStackResult result;
310         auto cLock = m_csdkLock.lock();
311         if (cLock)
312         {
313             std::lock_guard<std::recursive_mutex> lock(*cLock);
314             result = OCDoResource(nullptr, OC_REST_DISCOVER,
315                                   resourceUri.str().c_str(),
316                                   nullptr, nullptr, connectivityType,
317                                   static_cast<OCQualityOfService>(QoS),
318                                   &cbdata,
319                                   nullptr, 0);
320         }
321         else
322         {
323             delete context;
324             result = OC_STACK_ERROR;
325         }
326         return result;
327     }
328 #ifdef WITH_MQ
329     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
330             OCClientResponse* clientResponse)
331     {
332         ClientCallbackContext::ListenContext* context =
333             static_cast<ClientCallbackContext::ListenContext*>(ctx);
334
335         if (!clientResponse)
336         {
337             return OC_STACK_KEEP_TRANSACTION;
338         }
339
340         if (clientResponse->result != OC_STACK_OK)
341         {
342             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
343                     << clientResponse->result
344                     << std::flush;
345
346             return OC_STACK_KEEP_TRANSACTION;
347         }
348
349         auto clientWrapper = context->clientWrapper.lock();
350         if (!clientWrapper)
351         {
352             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
353                     << std::flush;
354             return OC_STACK_KEEP_TRANSACTION;
355         }
356
357         try{
358             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
359                                         (OCRepPayload *) clientResponse->payload);
360
361             // loop to ensure valid construction of all resources
362             for (auto resource : container.Resources())
363             {
364                 std::thread exec(context->callback, resource);
365                 exec.detach();
366             }
367         }
368         catch (std::exception &e){
369             oclog() << "Exception in listCallback, ignoring response: "
370                     << e.what() << std::flush;
371         }
372
373
374         return OC_STACK_KEEP_TRANSACTION;
375     }
376
377     OCStackResult InProcClientWrapper::ListenForMQTopic(
378             const OCDevAddr& devAddr,
379             const std::string& resourceUri,
380             const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
381             FindCallback& callback, QualityOfService QoS)
382     {
383         oclog() << "ListenForMQTopic()" << std::flush;
384
385         if (!callback)
386         {
387             return OC_STACK_INVALID_PARAM;
388         }
389
390         ClientCallbackContext::ListenContext* context =
391             new ClientCallbackContext::ListenContext(callback, shared_from_this());
392         OCCallbackData cbdata;
393         cbdata.context = static_cast<void*>(context),
394         cbdata.cb      = listenMQCallback;
395         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
396
397         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
398
399         OCStackResult result = OC_STACK_ERROR;
400         auto cLock = m_csdkLock.lock();
401         if (cLock)
402         {
403             std::lock_guard<std::recursive_mutex> lock(*cLock);
404             OCHeaderOption options[MAX_HEADER_OPTIONS];
405             result = OCDoResource(
406                                   nullptr, OC_REST_GET,
407                                   uri.c_str(),
408                                   &devAddr, nullptr,
409                                   CT_DEFAULT,
410                                   static_cast<OCQualityOfService>(QoS),
411                                   &cbdata,
412                                   assembleHeaderOptions(options, headerOptions),
413                                   headerOptions.size());
414         }
415         else
416         {
417             delete context;
418         }
419
420         return result;
421     }
422 #endif
423
424     OCStackApplicationResult listenDeviceCallback(void* ctx,
425                                                   OCDoHandle /*handle*/,
426             OCClientResponse* clientResponse)
427     {
428         ClientCallbackContext::DeviceListenContext* context =
429             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
430
431         try
432         {
433             OCRepresentation rep = parseGetSetCallback(clientResponse);
434             std::thread exec(context->callback, rep);
435             exec.detach();
436         }
437         catch(OC::OCException& e)
438         {
439             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
440                 <<e.what() <<std::flush;
441         }
442
443         return OC_STACK_KEEP_TRANSACTION;
444     }
445
446     OCStackResult InProcClientWrapper::ListenForDevice(
447             const std::string& serviceUrl,
448             const std::string& deviceURI,
449             OCConnectivityType connectivityType,
450             FindDeviceCallback& callback,
451             QualityOfService QoS)
452     {
453         if (!callback)
454         {
455             return OC_STACK_INVALID_PARAM;
456         }
457         OCStackResult result;
458         ostringstream deviceUri;
459         deviceUri << serviceUrl << deviceURI;
460
461         ClientCallbackContext::DeviceListenContext* context =
462             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
463         OCCallbackData cbdata;
464
465         cbdata.context = static_cast<void*>(context),
466         cbdata.cb      = listenDeviceCallback;
467         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
468
469         auto cLock = m_csdkLock.lock();
470         if (cLock)
471         {
472             std::lock_guard<std::recursive_mutex> lock(*cLock);
473             result = OCDoResource(nullptr, OC_REST_DISCOVER,
474                                   deviceUri.str().c_str(),
475                                   nullptr, nullptr, connectivityType,
476                                   static_cast<OCQualityOfService>(QoS),
477                                   &cbdata,
478                                   nullptr, 0);
479         }
480         else
481         {
482             delete context;
483             result = OC_STACK_ERROR;
484         }
485         return result;
486     }
487
488     void parseServerHeaderOptions(OCClientResponse* clientResponse,
489                     HeaderOptions& serverHeaderOptions)
490     {
491         if (clientResponse)
492         {
493             // Parse header options from server
494             uint16_t optionID;
495             std::string optionData;
496
497             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
498             {
499                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
500                 optionData = reinterpret_cast<const char*>
501                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
502                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
503                 serverHeaderOptions.push_back(headerOption);
504             }
505         }
506         else
507         {
508             // clientResponse is invalid
509             // TODO check proper logging
510             std::cout << " Invalid response " << std::endl;
511         }
512     }
513
514 #ifdef WITH_MQ
515     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
516                     OCClientResponse* clientResponse)
517     {
518         ClientCallbackContext::CreateMQTopicContext* context =
519             static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
520         OCRepresentation rep;
521         HeaderOptions serverHeaderOptions;
522
523         if (!clientResponse)
524         {
525             return OC_STACK_DELETE_TRANSACTION;
526         }
527
528         std::string createdUri;
529         OCStackResult result = clientResponse->result;
530         if (OC_STACK_OK               == result ||
531             OC_STACK_RESOURCE_CREATED == result)
532         {
533             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
534             try
535             {
536                 rep = parseGetSetCallback(clientResponse);
537             }
538             catch(OC::OCException& e)
539             {
540                 result = e.code();
541             }
542
543             bool isLocationOption = false;
544             for (auto headerOption : serverHeaderOptions)
545             {
546                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
547                 {
548                     createdUri += "/";
549                     createdUri += headerOption.getOptionData();
550                     if (!isLocationOption)
551                     {
552                         isLocationOption = true;
553                     }
554                 }
555             }
556
557             if (!isLocationOption)
558             {
559                 createdUri = clientResponse->resourceUri;
560             }
561         }
562
563         auto clientWrapper = context->clientWrapper.lock();
564
565         if (!clientWrapper)
566         {
567             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
568                     << std::flush;
569             return OC_STACK_DELETE_TRANSACTION;
570         }
571
572         try{
573             if (OC_STACK_OK               == result ||
574                 OC_STACK_RESOURCE_CREATED == result)
575             {
576                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
577                                             createdUri);
578                 for (auto resource : container.Resources())
579                 {
580                     std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
581                     exec.detach();
582                 }
583             }
584             else
585             {
586                 std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
587                 exec.detach();
588             }
589         }
590         catch (std::exception &e){
591             oclog() << "Exception in createMQTopicCallback, ignoring response: "
592                     << e.what() << std::flush;
593         }
594         return OC_STACK_DELETE_TRANSACTION;
595     }
596
597     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
598                 const OCDevAddr& devAddr,
599                 const std::string& uri,
600                 const OCRepresentation& rep,
601                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
602                 MQCreateTopicCallback& callback, QualityOfService QoS)
603     {
604         if (!callback)
605         {
606             return OC_STACK_INVALID_PARAM;
607         }
608         OCStackResult result;
609         ClientCallbackContext::CreateMQTopicContext* ctx =
610                 new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
611         OCCallbackData cbdata;
612         cbdata.context = static_cast<void*>(ctx),
613         cbdata.cb      = createMQTopicCallback;
614         cbdata.cd      = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
615
616         std::string url = assembleSetResourceUri(uri, queryParams);
617
618         auto cLock = m_csdkLock.lock();
619
620         if (cLock)
621         {
622             std::lock_guard<std::recursive_mutex> lock(*cLock);
623             OCHeaderOption options[MAX_HEADER_OPTIONS];
624
625             result = OCDoResource(nullptr, OC_REST_PUT,
626                                   url.c_str(), &devAddr,
627                                   assembleSetResourcePayload(rep),
628                                   CT_DEFAULT,
629                                   static_cast<OCQualityOfService>(QoS),
630                                   &cbdata,
631                                   assembleHeaderOptions(options, headerOptions),
632                                   headerOptions.size());
633         }
634         else
635         {
636             delete ctx;
637             result = OC_STACK_ERROR;
638         }
639
640         return result;
641     }
642 #endif
643     OCStackApplicationResult getResourceCallback(void* ctx,
644                                                  OCDoHandle /*handle*/,
645         OCClientResponse* clientResponse)
646     {
647         ClientCallbackContext::GetContext* context =
648             static_cast<ClientCallbackContext::GetContext*>(ctx);
649
650         OCRepresentation rep;
651         HeaderOptions serverHeaderOptions;
652         OCStackResult result = clientResponse->result;
653         if (result == OC_STACK_OK)
654         {
655             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
656             try
657             {
658                 rep = parseGetSetCallback(clientResponse);
659             }
660             catch(OC::OCException& e)
661             {
662                 result = e.code();
663             }
664         }
665
666         std::thread exec(context->callback, serverHeaderOptions, rep, result);
667         exec.detach();
668         return OC_STACK_DELETE_TRANSACTION;
669     }
670
671     OCStackResult InProcClientWrapper::GetResourceRepresentation(
672         const OCDevAddr& devAddr,
673         const std::string& resourceUri,
674         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
675         GetCallback& callback, QualityOfService QoS)
676     {
677         if (!callback)
678         {
679             return OC_STACK_INVALID_PARAM;
680         }
681         OCStackResult result;
682         ClientCallbackContext::GetContext* ctx =
683             new ClientCallbackContext::GetContext(callback);
684         OCCallbackData cbdata;
685         cbdata.context = static_cast<void*>(ctx),
686         cbdata.cb      = getResourceCallback;
687         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
688
689
690         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
691
692         auto cLock = m_csdkLock.lock();
693
694         if (cLock)
695         {
696             std::lock_guard<std::recursive_mutex> lock(*cLock);
697             OCHeaderOption options[MAX_HEADER_OPTIONS];
698
699             result = OCDoResource(
700                                   nullptr, OC_REST_GET,
701                                   uri.c_str(),
702                                   &devAddr, nullptr,
703                                   CT_DEFAULT,
704                                   static_cast<OCQualityOfService>(QoS),
705                                   &cbdata,
706                                   assembleHeaderOptions(options, headerOptions),
707                                   headerOptions.size());
708         }
709         else
710         {
711             delete ctx;
712             result = OC_STACK_ERROR;
713         }
714         return result;
715     }
716
717
718     OCStackApplicationResult setResourceCallback(void* ctx,
719                                                  OCDoHandle /*handle*/,
720         OCClientResponse* clientResponse)
721     {
722         ClientCallbackContext::SetContext* context =
723             static_cast<ClientCallbackContext::SetContext*>(ctx);
724         OCRepresentation attrs;
725         HeaderOptions serverHeaderOptions;
726
727         OCStackResult result = clientResponse->result;
728         if (OC_STACK_OK               == result ||
729             OC_STACK_RESOURCE_CREATED == result ||
730             OC_STACK_RESOURCE_DELETED == result ||
731             OC_STACK_RESOURCE_CHANGED == result)
732         {
733             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
734             try
735             {
736                 attrs = parseGetSetCallback(clientResponse);
737             }
738             catch(OC::OCException& e)
739             {
740                 result = e.code();
741             }
742         }
743
744         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
745         exec.detach();
746         return OC_STACK_DELETE_TRANSACTION;
747     }
748
749     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
750         const QueryParamsMap& queryParams)
751     {
752         if (!uri.empty())
753         {
754             if (uri.back() == '/')
755             {
756                 uri.resize(uri.size() - 1);
757             }
758         }
759
760         ostringstream paramsList;
761         if (queryParams.size() > 0)
762         {
763             paramsList << '?';
764         }
765
766         for (auto& param : queryParams)
767         {
768             paramsList << param.first <<'='<<param.second<<';';
769         }
770
771         std::string queryString = paramsList.str();
772
773         if (queryString.empty())
774         {
775             return uri;
776         }
777
778         if (queryString.back() == ';')
779         {
780             queryString.resize(queryString.size() - 1);
781         }
782
783         std::string ret = uri + queryString;
784         return ret;
785     }
786
787     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
788         const QueryParamsList& queryParams)
789     {
790         if (!uri.empty())
791         {
792             if (uri.back() == '/')
793             {
794                 uri.resize(uri.size() - 1);
795             }
796         }
797
798         ostringstream paramsList;
799         if (queryParams.size() > 0)
800         {
801             paramsList << '?';
802         }
803
804         for (auto& param : queryParams)
805         {
806             for (auto& paramList : param.second)
807             {
808                 paramsList << param.first << '=' << paramList;
809                 if (paramList != param.second.back())
810                 {
811                     paramsList << '&';
812                 }
813             }
814             paramsList << ';';
815         }
816
817         std::string queryString = paramsList.str();
818
819         if (queryString.empty())
820         {
821             return uri;
822         }
823
824         if (queryString.back() == ';')
825         {
826             queryString.resize(queryString.size() - 1);
827         }
828
829         std::string ret = uri + queryString;
830         return ret;
831     }
832
833     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
834     {
835         MessageContainer ocInfo;
836         ocInfo.addRepresentation(rep);
837         for(const OCRepresentation& r : rep.getChildren())
838         {
839             ocInfo.addRepresentation(r);
840         }
841
842         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
843     }
844
845     OCStackResult InProcClientWrapper::PostResourceRepresentation(
846         const OCDevAddr& devAddr,
847         const std::string& uri,
848         const OCRepresentation& rep,
849         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
850         OCConnectivityType connectivityType,
851         PostCallback& callback, QualityOfService QoS)
852     {
853         if (!callback)
854         {
855             return OC_STACK_INVALID_PARAM;
856         }
857         OCStackResult result;
858         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
859         OCCallbackData cbdata;
860         cbdata.context = static_cast<void*>(ctx),
861         cbdata.cb      = setResourceCallback;
862         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
863
864
865         std::string url = assembleSetResourceUri(uri, queryParams);
866
867         auto cLock = m_csdkLock.lock();
868
869         if (cLock)
870         {
871             std::lock_guard<std::recursive_mutex> lock(*cLock);
872             OCHeaderOption options[MAX_HEADER_OPTIONS];
873
874             result = OCDoResource(nullptr, OC_REST_POST,
875                                   url.c_str(), &devAddr,
876                                   assembleSetResourcePayload(rep),
877                                   connectivityType,
878                                   static_cast<OCQualityOfService>(QoS),
879                                   &cbdata,
880                                   assembleHeaderOptions(options, headerOptions),
881                                   headerOptions.size());
882         }
883         else
884         {
885             delete ctx;
886             result = OC_STACK_ERROR;
887         }
888
889         return result;
890     }
891
892     OCStackResult InProcClientWrapper::PutResourceRepresentation(
893         const OCDevAddr& devAddr,
894         const std::string& uri,
895         const OCRepresentation& rep,
896         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
897         PutCallback& callback, QualityOfService QoS)
898     {
899         if (!callback)
900         {
901             return OC_STACK_INVALID_PARAM;
902         }
903         OCStackResult result;
904         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
905         OCCallbackData cbdata;
906         cbdata.context = static_cast<void*>(ctx),
907         cbdata.cb      = setResourceCallback;
908         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
909
910
911         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
912
913         auto cLock = m_csdkLock.lock();
914
915         if (cLock)
916         {
917             std::lock_guard<std::recursive_mutex> lock(*cLock);
918             OCDoHandle handle;
919             OCHeaderOption options[MAX_HEADER_OPTIONS];
920
921             result = OCDoResource(&handle, OC_REST_PUT,
922                                   url.c_str(), &devAddr,
923                                   assembleSetResourcePayload(rep),
924                                   CT_DEFAULT,
925                                   static_cast<OCQualityOfService>(QoS),
926                                   &cbdata,
927                                   assembleHeaderOptions(options, headerOptions),
928                                   headerOptions.size());
929         }
930         else
931         {
932             delete ctx;
933             result = OC_STACK_ERROR;
934         }
935
936         return result;
937     }
938
939     OCStackApplicationResult deleteResourceCallback(void* ctx,
940                                                     OCDoHandle /*handle*/,
941         OCClientResponse* clientResponse)
942     {
943         ClientCallbackContext::DeleteContext* context =
944             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
945         HeaderOptions serverHeaderOptions;
946
947         if (clientResponse->result == OC_STACK_OK)
948         {
949             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
950         }
951         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
952         exec.detach();
953         return OC_STACK_DELETE_TRANSACTION;
954     }
955
956     OCStackResult InProcClientWrapper::DeleteResource(
957         const OCDevAddr& devAddr,
958         const std::string& uri,
959         const HeaderOptions& headerOptions,
960         DeleteCallback& callback,
961         QualityOfService /*QoS*/)
962     {
963         if (!callback)
964         {
965             return OC_STACK_INVALID_PARAM;
966         }
967         OCStackResult result;
968         ClientCallbackContext::DeleteContext* ctx =
969             new ClientCallbackContext::DeleteContext(callback);
970         OCCallbackData cbdata;
971         cbdata.context = static_cast<void*>(ctx),
972         cbdata.cb      = deleteResourceCallback;
973         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
974
975
976         auto cLock = m_csdkLock.lock();
977
978         if (cLock)
979         {
980             OCHeaderOption options[MAX_HEADER_OPTIONS];
981
982             std::lock_guard<std::recursive_mutex> lock(*cLock);
983
984             result = OCDoResource(nullptr, OC_REST_DELETE,
985                                   uri.c_str(), &devAddr,
986                                   nullptr,
987                                   CT_DEFAULT,
988                                   static_cast<OCQualityOfService>(m_cfg.QoS),
989                                   &cbdata,
990                                   assembleHeaderOptions(options, headerOptions),
991                                   headerOptions.size());
992         }
993         else
994         {
995             delete ctx;
996             result = OC_STACK_ERROR;
997         }
998
999         return result;
1000     }
1001
1002     OCStackApplicationResult observeResourceCallback(void* ctx,
1003                                                      OCDoHandle /*handle*/,
1004         OCClientResponse* clientResponse)
1005     {
1006         ClientCallbackContext::ObserveContext* context =
1007             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1008         OCRepresentation attrs;
1009         HeaderOptions serverHeaderOptions;
1010         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1011         OCStackResult result = clientResponse->result;
1012         if (clientResponse->result == OC_STACK_OK)
1013         {
1014             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1015             try
1016             {
1017                 attrs = parseGetSetCallback(clientResponse);
1018             }
1019             catch(OC::OCException& e)
1020             {
1021                 result = e.code();
1022             }
1023         }
1024         std::thread exec(context->callback, serverHeaderOptions, attrs,
1025                     result, sequenceNumber);
1026         exec.detach();
1027         if (sequenceNumber == OC_OBSERVE_DEREGISTER)
1028         {
1029             return OC_STACK_DELETE_TRANSACTION;
1030         }
1031         return OC_STACK_KEEP_TRANSACTION;
1032     }
1033
1034     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1035         const OCDevAddr& devAddr,
1036         const std::string& uri,
1037         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1038         ObserveCallback& callback, QualityOfService QoS)
1039     {
1040         if (!callback)
1041         {
1042             return OC_STACK_INVALID_PARAM;
1043         }
1044         OCStackResult result;
1045
1046         ClientCallbackContext::ObserveContext* ctx =
1047             new ClientCallbackContext::ObserveContext(callback);
1048         OCCallbackData cbdata;
1049         cbdata.context = static_cast<void*>(ctx),
1050         cbdata.cb      = observeResourceCallback;
1051         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1052
1053
1054         OCMethod method;
1055         if (observeType == ObserveType::Observe)
1056         {
1057             method = OC_REST_OBSERVE;
1058         }
1059         else if (observeType == ObserveType::ObserveAll)
1060         {
1061             method = OC_REST_OBSERVE_ALL;
1062         }
1063         else
1064         {
1065             method = OC_REST_OBSERVE_ALL;
1066         }
1067
1068         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1069
1070         auto cLock = m_csdkLock.lock();
1071
1072         if (cLock)
1073         {
1074             std::lock_guard<std::recursive_mutex> lock(*cLock);
1075             OCHeaderOption options[MAX_HEADER_OPTIONS];
1076
1077             result = OCDoResource(handle, method,
1078                                   url.c_str(), &devAddr,
1079                                   nullptr,
1080                                   CT_DEFAULT,
1081                                   static_cast<OCQualityOfService>(QoS),
1082                                   &cbdata,
1083                                   assembleHeaderOptions(options, headerOptions),
1084                                   headerOptions.size());
1085         }
1086         else
1087         {
1088             delete ctx;
1089             return OC_STACK_ERROR;
1090         }
1091
1092         return result;
1093     }
1094
1095     OCStackResult InProcClientWrapper::CancelObserveResource(
1096             OCDoHandle handle,
1097             const std::string& /*host*/,
1098             const std::string& /*uri*/,
1099             const HeaderOptions& headerOptions,
1100             QualityOfService QoS)
1101     {
1102         OCStackResult result;
1103         auto cLock = m_csdkLock.lock();
1104
1105         if (cLock)
1106         {
1107             std::lock_guard<std::recursive_mutex> lock(*cLock);
1108             OCHeaderOption options[MAX_HEADER_OPTIONS];
1109
1110             result = OCCancel(handle,
1111                     static_cast<OCQualityOfService>(QoS),
1112                     assembleHeaderOptions(options, headerOptions),
1113                     headerOptions.size());
1114         }
1115         else
1116         {
1117             result = OC_STACK_ERROR;
1118         }
1119
1120         return result;
1121     }
1122
1123     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1124                                                        OCDoHandle /*handle*/,
1125             OCClientResponse* clientResponse)
1126     {
1127         ClientCallbackContext::SubscribePresenceContext* context =
1128         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1129
1130         /*
1131          * This a hack while we rethink presence subscription.
1132          */
1133         std::string url = clientResponse->devAddr.addr;
1134
1135         std::thread exec(context->callback, clientResponse->result,
1136                     clientResponse->sequenceNumber, url);
1137
1138         exec.detach();
1139
1140         return OC_STACK_KEEP_TRANSACTION;
1141     }
1142
1143     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1144         const std::string& host, const std::string& resourceType,
1145         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1146     {
1147         if (!presenceHandler)
1148         {
1149             return OC_STACK_INVALID_PARAM;
1150         }
1151
1152         ClientCallbackContext::SubscribePresenceContext* ctx =
1153             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1154         OCCallbackData cbdata;
1155         cbdata.context = static_cast<void*>(ctx),
1156         cbdata.cb      = subscribePresenceCallback;
1157         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1158
1159
1160         auto cLock = m_csdkLock.lock();
1161
1162         std::ostringstream os;
1163         os << host << OC_RSRVD_PRESENCE_URI;
1164
1165         if (!resourceType.empty())
1166         {
1167             os << "?rt=" << resourceType;
1168         }
1169
1170         if (!cLock)
1171         {
1172             delete ctx;
1173             return OC_STACK_ERROR;
1174         }
1175
1176         return OCDoResource(handle, OC_REST_PRESENCE,
1177                             os.str().c_str(), nullptr,
1178                             nullptr, connectivityType,
1179                             OC_LOW_QOS, &cbdata, NULL, 0);
1180     }
1181
1182     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1183     {
1184         OCStackResult result;
1185         auto cLock = m_csdkLock.lock();
1186
1187         if (cLock)
1188         {
1189             std::lock_guard<std::recursive_mutex> lock(*cLock);
1190             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1191         }
1192         else
1193         {
1194             result = OC_STACK_ERROR;
1195         }
1196
1197         return result;
1198     }
1199
1200     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1201                                                                const std::string& host,
1202                                                                const QueryParamsList& queryParams,
1203                                                                OCConnectivityType connectivityType,
1204                                                                ObserveCallback& callback)
1205     {
1206         if (!callback)
1207         {
1208             return OC_STACK_INVALID_PARAM;
1209         }
1210         OCStackResult result;
1211
1212         ClientCallbackContext::ObserveContext* ctx =
1213             new ClientCallbackContext::ObserveContext(callback);
1214         OCCallbackData cbdata;
1215         cbdata.context = static_cast<void*>(ctx),
1216         cbdata.cb      = observeResourceCallback;
1217         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1218
1219         auto cLock = m_csdkLock.lock();
1220
1221         if (cLock)
1222         {
1223             std::lock_guard<std::recursive_mutex> lock(*cLock);
1224
1225             std::ostringstream os;
1226             os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
1227             std::string url = assembleSetResourceUri(os.str(), queryParams);
1228
1229             result = OCDoResource(handle, OC_REST_OBSERVE,
1230                                   url.c_str(), nullptr,
1231                                   nullptr, connectivityType,
1232                                   OC_LOW_QOS, &cbdata,
1233                                   nullptr, 0);
1234         }
1235         else
1236         {
1237             delete ctx;
1238             result = OC_STACK_ERROR;
1239         }
1240
1241         return result;
1242     }
1243
1244     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1245     {
1246         qos = m_cfg.QoS;
1247         return OC_STACK_OK;
1248     }
1249
1250     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1251            const HeaderOptions& headerOptions)
1252     {
1253         int i = 0;
1254
1255         if ( headerOptions.size() == 0)
1256         {
1257             return nullptr;
1258         }
1259
1260         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1261         {
1262             options[i] = OCHeaderOption();
1263             options[i].protocolID = OC_COAP_ID;
1264             options[i].optionID = it->getOptionID();
1265             options[i].optionLength = it->getOptionData().length() + 1;
1266             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1267             i++;
1268         }
1269
1270         return options;
1271     }
1272
1273     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1274     {
1275         if (!dev)
1276         {
1277             return nullptr;
1278         }
1279
1280         OCDPDev_t* result = new OCDPDev_t(*dev);
1281         result->prm = new OCPrm_t[dev->prmLen];
1282         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1283         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1284     }
1285
1286     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1287     {
1288         while(list)
1289         {
1290             dpList.push_back(cloneDevice(list));
1291             list = list->next;
1292         }
1293     }
1294
1295     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1296             GetDirectPairedCallback& callback)
1297     {
1298         if (!callback || 0 == waittime)
1299         {
1300             return OC_STACK_INVALID_PARAM;
1301         }
1302
1303         OCStackResult result = OC_STACK_ERROR;
1304         const OCDPDev_t *list = nullptr;
1305         PairedDevices dpDeviceList;
1306
1307         auto cLock = m_csdkLock.lock();
1308
1309         if (cLock)
1310         {
1311             std::lock_guard<std::recursive_mutex> lock(*cLock);
1312
1313             list = OCDiscoverDirectPairingDevices(waittime);
1314             if (NULL == list)
1315             {
1316                 result = OC_STACK_NO_RESOURCE;
1317                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1318                     << std::flush;
1319             }
1320             else {
1321                 convert(list, dpDeviceList);
1322                 std::thread exec(callback, dpDeviceList);
1323                 exec.detach();
1324                 result = OC_STACK_OK;
1325             }
1326         }
1327         else
1328         {
1329             result = OC_STACK_ERROR;
1330         }
1331
1332         return result;
1333     }
1334
1335     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1336     {
1337         if (!callback)
1338         {
1339             return OC_STACK_INVALID_PARAM;
1340         }
1341
1342         OCStackResult result = OC_STACK_ERROR;
1343         const OCDPDev_t *list = nullptr;
1344         PairedDevices dpDeviceList;
1345
1346         auto cLock = m_csdkLock.lock();
1347
1348         if (cLock)
1349         {
1350             std::lock_guard<std::recursive_mutex> lock(*cLock);
1351
1352             list = OCGetDirectPairedDevices();
1353             if (NULL == list)
1354             {
1355                 result = OC_STACK_NO_RESOURCE;
1356                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1357                     << std::flush;
1358             }
1359             else {
1360                 convert(list, dpDeviceList);
1361                 std::thread exec(callback, dpDeviceList);
1362                 exec.detach();
1363                 result = OC_STACK_OK;
1364             }
1365         }
1366         else
1367         {
1368             result = OC_STACK_ERROR;
1369         }
1370
1371         return result;
1372     }
1373
1374     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1375             OCStackResult result)
1376     {
1377
1378         ClientCallbackContext::DirectPairingContext* context =
1379             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1380
1381         std::thread exec(context->callback, cloneDevice(peer), result);
1382         exec.detach();
1383     }
1384
1385     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1386             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1387     {
1388         if (!peer || !callback)
1389         {
1390             oclog() << "Invalid parameters" << std::flush;
1391             return OC_STACK_INVALID_PARAM;
1392         }
1393
1394         OCStackResult result = OC_STACK_ERROR;
1395         ClientCallbackContext::DirectPairingContext* context =
1396             new ClientCallbackContext::DirectPairingContext(callback);
1397
1398         auto cLock = m_csdkLock.lock();
1399         if (cLock)
1400         {
1401             std::lock_guard<std::recursive_mutex> lock(*cLock);
1402             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1403                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1404         }
1405         else
1406         {
1407             delete context;
1408             result = OC_STACK_ERROR;
1409         }
1410         return result;
1411     }
1412 }