Merge "Merge remote-tracking branch 'origin/master' into notification-service" into...
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try{
169             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171             // loop to ensure valid construction of all resources
172
173             for(auto resource : container.Resources())
174             {
175                 std::thread exec(context->callback, resource);
176                 exec.detach();
177             }
178         }
179         catch (std::exception &e){
180             oclog() << "Exception in listCallback, ignoring response: "
181                     << e.what() << std::flush;
182         }
183
184
185         return OC_STACK_KEEP_TRANSACTION;
186     }
187
188     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
189         OCClientResponse* clientResponse)
190     {
191         if (!ctx || !clientResponse)
192         {
193             return OC_STACK_KEEP_TRANSACTION;
194         }
195
196         ClientCallbackContext::ListenErrorContext* context =
197             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
198         if (!context)
199         {
200             return OC_STACK_KEEP_TRANSACTION;
201         }
202
203         OCStackResult result = clientResponse->result;
204         if (result == OC_STACK_OK)
205         {
206             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
207             {
208                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
209                     << std::flush;
210                 return OC_STACK_KEEP_TRANSACTION;
211             }
212
213             auto clientWrapper = context->clientWrapper.lock();
214
215             if (!clientWrapper)
216             {
217                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
218                         << std::flush;
219                 return OC_STACK_KEEP_TRANSACTION;
220             }
221
222             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
223                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
224             // loop to ensure valid construction of all resources
225             for (auto resource : container.Resources())
226             {
227                 std::thread exec(context->callback, resource);
228                 exec.detach();
229             }
230             return OC_STACK_KEEP_TRANSACTION;
231         }
232
233         std::string resourceURI = clientResponse->resourceUri;
234         std::thread exec(context->errorCallback, resourceURI, result);
235         exec.detach();
236         return OC_STACK_DELETE_TRANSACTION;
237     }
238
239     OCStackResult InProcClientWrapper::ListenForResource(
240             const std::string& serviceUrl,
241             const std::string& resourceType,
242             OCConnectivityType connectivityType,
243             FindCallback& callback, QualityOfService QoS)
244     {
245         if (!callback)
246         {
247             return OC_STACK_INVALID_PARAM;
248         }
249
250         OCStackResult result;
251         ostringstream resourceUri;
252         resourceUri << serviceUrl << resourceType;
253
254         ClientCallbackContext::ListenContext* context =
255             new ClientCallbackContext::ListenContext(callback, shared_from_this());
256         OCCallbackData cbdata;
257         cbdata.context = static_cast<void*>(context),
258         cbdata.cb      = listenCallback;
259         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
260
261         auto cLock = m_csdkLock.lock();
262         if (cLock)
263         {
264             std::lock_guard<std::recursive_mutex> lock(*cLock);
265             result = OCDoResource(nullptr, OC_REST_DISCOVER,
266                                   resourceUri.str().c_str(),
267                                   nullptr, nullptr, connectivityType,
268                                   static_cast<OCQualityOfService>(QoS),
269                                   &cbdata,
270                                   nullptr, 0);
271         }
272         else
273         {
274             delete context;
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280     OCStackResult InProcClientWrapper::ListenErrorForResource(
281             const std::string& serviceUrl,
282             const std::string& resourceType,
283             OCConnectivityType connectivityType,
284             FindCallback& callback, FindErrorCallback& errorCallback,
285             QualityOfService QoS)
286     {
287         if (!callback)
288         {
289             return OC_STACK_INVALID_PARAM;
290         }
291
292         ostringstream resourceUri;
293         resourceUri << serviceUrl << resourceType;
294
295         ClientCallbackContext::ListenErrorContext* context =
296             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
297                                                           shared_from_this());
298         if (!context)
299         {
300             return OC_STACK_ERROR;
301         }
302
303         OCCallbackData cbdata(
304                 static_cast<void*>(context),
305                 listenErrorCallback,
306                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
307             );
308
309         OCStackResult result;
310         auto cLock = m_csdkLock.lock();
311         if (cLock)
312         {
313             std::lock_guard<std::recursive_mutex> lock(*cLock);
314             result = OCDoResource(nullptr, OC_REST_DISCOVER,
315                                   resourceUri.str().c_str(),
316                                   nullptr, nullptr, connectivityType,
317                                   static_cast<OCQualityOfService>(QoS),
318                                   &cbdata,
319                                   nullptr, 0);
320         }
321         else
322         {
323             delete context;
324             result = OC_STACK_ERROR;
325         }
326         return result;
327     }
328 #ifdef WITH_MQ
329     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
330                                               OCClientResponse* clientResponse)
331     {
332         ClientCallbackContext::MQTopicContext* context =
333             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
334
335         if (!clientResponse || !context)
336         {
337             return OC_STACK_DELETE_TRANSACTION;
338         }
339
340         if (clientResponse->result != OC_STACK_OK)
341         {
342             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
343                     << clientResponse->result
344                     << std::flush;
345
346             std::thread exec(context->callback, clientResponse->result,
347                              clientResponse->resourceUri, nullptr);
348             exec.detach();
349
350             return OC_STACK_DELETE_TRANSACTION;
351         }
352
353         auto clientWrapper = context->clientWrapper.lock();
354         if (!clientWrapper)
355         {
356             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
357                     << std::flush;
358             return OC_STACK_DELETE_TRANSACTION;
359         }
360
361         try{
362             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
363                                         (OCRepPayload *) clientResponse->payload);
364
365             // loop to ensure valid construction of all resources
366             for (auto resource : container.Resources())
367             {
368                 std::thread exec(context->callback, clientResponse->result,
369                                  clientResponse->resourceUri, resource);
370                 exec.detach();
371             }
372         }
373         catch (std::exception &e){
374             oclog() << "Exception in listCallback, ignoring response: "
375                     << e.what() << std::flush;
376         }
377
378         return OC_STACK_DELETE_TRANSACTION;
379     }
380
381     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
382                                                         const std::string& resourceUri,
383                                                         const QueryParamsMap& queryParams,
384                                                         const HeaderOptions& headerOptions,
385                                                         MQTopicCallback& callback,
386                                                         QualityOfService QoS)
387     {
388         oclog() << "ListenForMQTopic()" << std::flush;
389
390         if (!callback)
391         {
392             return OC_STACK_INVALID_PARAM;
393         }
394
395         ClientCallbackContext::MQTopicContext* context =
396             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
397         OCCallbackData cbdata;
398         cbdata.context = static_cast<void*>(context),
399         cbdata.cb      = listenMQCallback;
400         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
401
402         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
403
404         OCStackResult result = OC_STACK_ERROR;
405         auto cLock = m_csdkLock.lock();
406         if (cLock)
407         {
408             std::lock_guard<std::recursive_mutex> lock(*cLock);
409             OCHeaderOption options[MAX_HEADER_OPTIONS];
410             result = OCDoResource(
411                                   nullptr, OC_REST_GET,
412                                   uri.c_str(),
413                                   &devAddr, nullptr,
414                                   CT_DEFAULT,
415                                   static_cast<OCQualityOfService>(QoS),
416                                   &cbdata,
417                                   assembleHeaderOptions(options, headerOptions),
418                                   headerOptions.size());
419         }
420         else
421         {
422             delete context;
423         }
424
425         return result;
426     }
427 #endif
428
429     OCStackApplicationResult listenDeviceCallback(void* ctx,
430                                                   OCDoHandle /*handle*/,
431             OCClientResponse* clientResponse)
432     {
433         ClientCallbackContext::DeviceListenContext* context =
434             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
435
436         try
437         {
438             OCRepresentation rep = parseGetSetCallback(clientResponse);
439             std::thread exec(context->callback, rep);
440             exec.detach();
441         }
442         catch(OC::OCException& e)
443         {
444             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
445                 <<e.what() <<std::flush;
446         }
447
448         return OC_STACK_KEEP_TRANSACTION;
449     }
450
451     OCStackResult InProcClientWrapper::ListenForDevice(
452             const std::string& serviceUrl,
453             const std::string& deviceURI,
454             OCConnectivityType connectivityType,
455             FindDeviceCallback& callback,
456             QualityOfService QoS)
457     {
458         if (!callback)
459         {
460             return OC_STACK_INVALID_PARAM;
461         }
462         OCStackResult result;
463         ostringstream deviceUri;
464         deviceUri << serviceUrl << deviceURI;
465
466         ClientCallbackContext::DeviceListenContext* context =
467             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
468         OCCallbackData cbdata;
469
470         cbdata.context = static_cast<void*>(context),
471         cbdata.cb      = listenDeviceCallback;
472         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
473
474         auto cLock = m_csdkLock.lock();
475         if (cLock)
476         {
477             std::lock_guard<std::recursive_mutex> lock(*cLock);
478             result = OCDoResource(nullptr, OC_REST_DISCOVER,
479                                   deviceUri.str().c_str(),
480                                   nullptr, nullptr, connectivityType,
481                                   static_cast<OCQualityOfService>(QoS),
482                                   &cbdata,
483                                   nullptr, 0);
484         }
485         else
486         {
487             delete context;
488             result = OC_STACK_ERROR;
489         }
490         return result;
491     }
492
493     void parseServerHeaderOptions(OCClientResponse* clientResponse,
494                     HeaderOptions& serverHeaderOptions)
495     {
496         if (clientResponse)
497         {
498             // Parse header options from server
499             uint16_t optionID;
500             std::string optionData;
501
502             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
503             {
504                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
505                 optionData = reinterpret_cast<const char*>
506                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
507                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
508                 serverHeaderOptions.push_back(headerOption);
509             }
510         }
511         else
512         {
513             // clientResponse is invalid
514             // TODO check proper logging
515             std::cout << " Invalid response " << std::endl;
516         }
517     }
518
519 #ifdef WITH_MQ
520     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
521                     OCClientResponse* clientResponse)
522     {
523         ClientCallbackContext::MQTopicContext* context =
524             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
525         HeaderOptions serverHeaderOptions;
526
527         if (!clientResponse || !context)
528         {
529             return OC_STACK_DELETE_TRANSACTION;
530         }
531
532         std::string createdUri;
533         bool isLocationOption = false;
534         OCStackResult result = clientResponse->result;
535         if (OC_STACK_OK               == result ||
536             OC_STACK_RESOURCE_CREATED == result)
537         {
538             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
539
540             for (auto headerOption : serverHeaderOptions)
541             {
542                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
543                 {
544                     createdUri += "/";
545                     createdUri += headerOption.getOptionData();
546                     if (!isLocationOption)
547                     {
548                         isLocationOption = true;
549                     }
550                 }
551             }
552         }
553
554         if (!isLocationOption)
555         {
556             createdUri = clientResponse->resourceUri;
557         }
558
559         auto clientWrapper = context->clientWrapper.lock();
560
561         if (!clientWrapper)
562         {
563             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
564                     << std::flush;
565             return OC_STACK_DELETE_TRANSACTION;
566         }
567
568         try{
569             if (OC_STACK_OK               == result ||
570                 OC_STACK_RESOURCE_CREATED == result)
571             {
572                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
573                                             createdUri);
574                 for (auto resource : container.Resources())
575                 {
576                     std::thread exec(context->callback, result, createdUri, resource);
577                     exec.detach();
578                 }
579             }
580             else
581             {
582                 std::thread exec(context->callback, result, createdUri, nullptr);
583                 exec.detach();
584             }
585         }
586         catch (std::exception &e){
587             oclog() << "Exception in createMQTopicCallback, ignoring response: "
588                     << e.what() << std::flush;
589         }
590         return OC_STACK_DELETE_TRANSACTION;
591     }
592
593     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
594                 const OCDevAddr& devAddr,
595                 const std::string& uri,
596                 const OCRepresentation& rep,
597                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
598                 MQTopicCallback& callback, QualityOfService QoS)
599     {
600         if (!callback)
601         {
602             return OC_STACK_INVALID_PARAM;
603         }
604         OCStackResult result;
605         ClientCallbackContext::MQTopicContext* ctx =
606                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
607         OCCallbackData cbdata;
608         cbdata.context = static_cast<void*>(ctx),
609         cbdata.cb      = createMQTopicCallback;
610         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
611
612         std::string url = assembleSetResourceUri(uri, queryParams);
613
614         auto cLock = m_csdkLock.lock();
615
616         if (cLock)
617         {
618             std::lock_guard<std::recursive_mutex> lock(*cLock);
619             OCHeaderOption options[MAX_HEADER_OPTIONS];
620
621             result = OCDoResource(nullptr, OC_REST_PUT,
622                                   url.c_str(), &devAddr,
623                                   assembleSetResourcePayload(rep),
624                                   CT_DEFAULT,
625                                   static_cast<OCQualityOfService>(QoS),
626                                   &cbdata,
627                                   assembleHeaderOptions(options, headerOptions),
628                                   headerOptions.size());
629         }
630         else
631         {
632             delete ctx;
633             result = OC_STACK_ERROR;
634         }
635
636         return result;
637     }
638 #endif
639     OCStackApplicationResult getResourceCallback(void* ctx,
640                                                  OCDoHandle /*handle*/,
641         OCClientResponse* clientResponse)
642     {
643         ClientCallbackContext::GetContext* context =
644             static_cast<ClientCallbackContext::GetContext*>(ctx);
645
646         OCRepresentation rep;
647         HeaderOptions serverHeaderOptions;
648         OCStackResult result = clientResponse->result;
649         if (result == OC_STACK_OK)
650         {
651             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
652             try
653             {
654                 rep = parseGetSetCallback(clientResponse);
655             }
656             catch(OC::OCException& e)
657             {
658                 result = e.code();
659             }
660         }
661
662         std::thread exec(context->callback, serverHeaderOptions, rep, result);
663         exec.detach();
664         return OC_STACK_DELETE_TRANSACTION;
665     }
666
667     OCStackResult InProcClientWrapper::GetResourceRepresentation(
668         const OCDevAddr& devAddr,
669         const std::string& resourceUri,
670         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
671         OCConnectivityType connectivityType,
672         GetCallback& callback, QualityOfService QoS)
673     {
674         if (!callback)
675         {
676             return OC_STACK_INVALID_PARAM;
677         }
678         OCStackResult result;
679         ClientCallbackContext::GetContext* ctx =
680             new ClientCallbackContext::GetContext(callback);
681         OCCallbackData cbdata;
682         cbdata.context = static_cast<void*>(ctx),
683         cbdata.cb      = getResourceCallback;
684         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
685
686
687         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
688
689         auto cLock = m_csdkLock.lock();
690
691         if (cLock)
692         {
693             std::lock_guard<std::recursive_mutex> lock(*cLock);
694             OCHeaderOption options[MAX_HEADER_OPTIONS];
695
696             result = OCDoResource(
697                                   nullptr, OC_REST_GET,
698                                   uri.c_str(),
699                                   &devAddr, nullptr,
700                                   connectivityType,
701                                   static_cast<OCQualityOfService>(QoS),
702                                   &cbdata,
703                                   assembleHeaderOptions(options, headerOptions),
704                                   headerOptions.size());
705         }
706         else
707         {
708             delete ctx;
709             result = OC_STACK_ERROR;
710         }
711         return result;
712     }
713
714
715     OCStackApplicationResult setResourceCallback(void* ctx,
716                                                  OCDoHandle /*handle*/,
717         OCClientResponse* clientResponse)
718     {
719         ClientCallbackContext::SetContext* context =
720             static_cast<ClientCallbackContext::SetContext*>(ctx);
721         OCRepresentation attrs;
722         HeaderOptions serverHeaderOptions;
723
724         OCStackResult result = clientResponse->result;
725         if (OC_STACK_OK               == result ||
726             OC_STACK_RESOURCE_CREATED == result ||
727             OC_STACK_RESOURCE_DELETED == result ||
728             OC_STACK_RESOURCE_CHANGED == result)
729         {
730             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
731             try
732             {
733                 attrs = parseGetSetCallback(clientResponse);
734             }
735             catch(OC::OCException& e)
736             {
737                 result = e.code();
738             }
739         }
740
741         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
742         exec.detach();
743         return OC_STACK_DELETE_TRANSACTION;
744     }
745
746     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
747         const QueryParamsMap& queryParams)
748     {
749         if (!uri.empty())
750         {
751             if (uri.back() == '/')
752             {
753                 uri.resize(uri.size() - 1);
754             }
755         }
756
757         ostringstream paramsList;
758         if (queryParams.size() > 0)
759         {
760             paramsList << '?';
761         }
762
763         for (auto& param : queryParams)
764         {
765             paramsList << param.first <<'='<<param.second<<';';
766         }
767
768         std::string queryString = paramsList.str();
769
770         if (queryString.empty())
771         {
772             return uri;
773         }
774
775         if (queryString.back() == ';')
776         {
777             queryString.resize(queryString.size() - 1);
778         }
779
780         std::string ret = uri + queryString;
781         return ret;
782     }
783
784     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
785         const QueryParamsList& queryParams)
786     {
787         if (!uri.empty())
788         {
789             if (uri.back() == '/')
790             {
791                 uri.resize(uri.size() - 1);
792             }
793         }
794
795         ostringstream paramsList;
796         if (queryParams.size() > 0)
797         {
798             paramsList << '?';
799         }
800
801         for (auto& param : queryParams)
802         {
803             for (auto& paramList : param.second)
804             {
805                 paramsList << param.first << '=' << paramList << ';';
806             }
807         }
808
809         std::string queryString = paramsList.str();
810
811         if (queryString.empty())
812         {
813             return uri;
814         }
815
816         if (queryString.back() == ';')
817         {
818             queryString.resize(queryString.size() - 1);
819         }
820
821         std::string ret = uri + queryString;
822         return ret;
823     }
824
825     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
826     {
827         MessageContainer ocInfo;
828         ocInfo.addRepresentation(rep);
829         for(const OCRepresentation& r : rep.getChildren())
830         {
831             ocInfo.addRepresentation(r);
832         }
833
834         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
835     }
836
837     OCStackResult InProcClientWrapper::PostResourceRepresentation(
838         const OCDevAddr& devAddr,
839         const std::string& uri,
840         const OCRepresentation& rep,
841         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
842         OCConnectivityType connectivityType,
843         PostCallback& callback, QualityOfService QoS)
844     {
845         if (!callback)
846         {
847             return OC_STACK_INVALID_PARAM;
848         }
849         OCStackResult result;
850         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
851         OCCallbackData cbdata;
852         cbdata.context = static_cast<void*>(ctx),
853         cbdata.cb      = setResourceCallback;
854         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
855
856
857         std::string url = assembleSetResourceUri(uri, queryParams);
858
859         auto cLock = m_csdkLock.lock();
860
861         if (cLock)
862         {
863             std::lock_guard<std::recursive_mutex> lock(*cLock);
864             OCHeaderOption options[MAX_HEADER_OPTIONS];
865
866             result = OCDoResource(nullptr, OC_REST_POST,
867                                   url.c_str(), &devAddr,
868                                   assembleSetResourcePayload(rep),
869                                   connectivityType,
870                                   static_cast<OCQualityOfService>(QoS),
871                                   &cbdata,
872                                   assembleHeaderOptions(options, headerOptions),
873                                   headerOptions.size());
874         }
875         else
876         {
877             delete ctx;
878             result = OC_STACK_ERROR;
879         }
880
881         return result;
882     }
883
884     OCStackResult InProcClientWrapper::PutResourceRepresentation(
885         const OCDevAddr& devAddr,
886         const std::string& uri,
887         const OCRepresentation& rep,
888         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
889         PutCallback& callback, QualityOfService QoS)
890     {
891         if (!callback)
892         {
893             return OC_STACK_INVALID_PARAM;
894         }
895         OCStackResult result;
896         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
897         OCCallbackData cbdata;
898         cbdata.context = static_cast<void*>(ctx),
899         cbdata.cb      = setResourceCallback;
900         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
901
902
903         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
904
905         auto cLock = m_csdkLock.lock();
906
907         if (cLock)
908         {
909             std::lock_guard<std::recursive_mutex> lock(*cLock);
910             OCDoHandle handle;
911             OCHeaderOption options[MAX_HEADER_OPTIONS];
912
913             result = OCDoResource(&handle, OC_REST_PUT,
914                                   url.c_str(), &devAddr,
915                                   assembleSetResourcePayload(rep),
916                                   CT_DEFAULT,
917                                   static_cast<OCQualityOfService>(QoS),
918                                   &cbdata,
919                                   assembleHeaderOptions(options, headerOptions),
920                                   headerOptions.size());
921         }
922         else
923         {
924             delete ctx;
925             result = OC_STACK_ERROR;
926         }
927
928         return result;
929     }
930
931     OCStackApplicationResult deleteResourceCallback(void* ctx,
932                                                     OCDoHandle /*handle*/,
933         OCClientResponse* clientResponse)
934     {
935         ClientCallbackContext::DeleteContext* context =
936             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
937         HeaderOptions serverHeaderOptions;
938
939         if (clientResponse->result == OC_STACK_OK)
940         {
941             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
942         }
943         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
944         exec.detach();
945         return OC_STACK_DELETE_TRANSACTION;
946     }
947
948     OCStackResult InProcClientWrapper::DeleteResource(
949         const OCDevAddr& devAddr,
950         const std::string& uri,
951         const HeaderOptions& headerOptions,
952         OCConnectivityType connectivityType,
953         DeleteCallback& callback,
954         QualityOfService /*QoS*/)
955     {
956         if (!callback)
957         {
958             return OC_STACK_INVALID_PARAM;
959         }
960         OCStackResult result;
961         ClientCallbackContext::DeleteContext* ctx =
962             new ClientCallbackContext::DeleteContext(callback);
963         OCCallbackData cbdata;
964         cbdata.context = static_cast<void*>(ctx),
965         cbdata.cb      = deleteResourceCallback;
966         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
967
968
969         auto cLock = m_csdkLock.lock();
970
971         if (cLock)
972         {
973             OCHeaderOption options[MAX_HEADER_OPTIONS];
974
975             std::lock_guard<std::recursive_mutex> lock(*cLock);
976
977             result = OCDoResource(nullptr, OC_REST_DELETE,
978                                   uri.c_str(), &devAddr,
979                                   nullptr,
980                                   connectivityType,
981                                   static_cast<OCQualityOfService>(m_cfg.QoS),
982                                   &cbdata,
983                                   assembleHeaderOptions(options, headerOptions),
984                                   headerOptions.size());
985         }
986         else
987         {
988             delete ctx;
989             result = OC_STACK_ERROR;
990         }
991
992         return result;
993     }
994
995     OCStackApplicationResult observeResourceCallback(void* ctx,
996                                                      OCDoHandle /*handle*/,
997         OCClientResponse* clientResponse)
998     {
999         ClientCallbackContext::ObserveContext* context =
1000             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1001         OCRepresentation attrs;
1002         HeaderOptions serverHeaderOptions;
1003         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1004         OCStackResult result = clientResponse->result;
1005         if (clientResponse->result == OC_STACK_OK)
1006         {
1007             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1008             try
1009             {
1010                 attrs = parseGetSetCallback(clientResponse);
1011             }
1012             catch(OC::OCException& e)
1013             {
1014                 result = e.code();
1015             }
1016         }
1017         std::thread exec(context->callback, serverHeaderOptions, attrs,
1018                     result, sequenceNumber);
1019         exec.detach();
1020
1021         return OC_STACK_KEEP_TRANSACTION;
1022     }
1023
1024     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1025         const OCDevAddr& devAddr,
1026         const std::string& uri,
1027         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1028         ObserveCallback& callback, QualityOfService QoS)
1029     {
1030         if (!callback)
1031         {
1032             return OC_STACK_INVALID_PARAM;
1033         }
1034         OCStackResult result;
1035
1036         ClientCallbackContext::ObserveContext* ctx =
1037             new ClientCallbackContext::ObserveContext(callback);
1038         OCCallbackData cbdata;
1039         cbdata.context = static_cast<void*>(ctx),
1040         cbdata.cb      = observeResourceCallback;
1041         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1042
1043
1044         OCMethod method;
1045         if (observeType == ObserveType::Observe)
1046         {
1047             method = OC_REST_OBSERVE;
1048         }
1049         else if (observeType == ObserveType::ObserveAll)
1050         {
1051             method = OC_REST_OBSERVE_ALL;
1052         }
1053         else
1054         {
1055             method = OC_REST_OBSERVE_ALL;
1056         }
1057
1058         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1059
1060         auto cLock = m_csdkLock.lock();
1061
1062         if (cLock)
1063         {
1064             std::lock_guard<std::recursive_mutex> lock(*cLock);
1065             OCHeaderOption options[MAX_HEADER_OPTIONS];
1066
1067             result = OCDoResource(handle, method,
1068                                   url.c_str(), &devAddr,
1069                                   nullptr,
1070                                   CT_DEFAULT,
1071                                   static_cast<OCQualityOfService>(QoS),
1072                                   &cbdata,
1073                                   assembleHeaderOptions(options, headerOptions),
1074                                   headerOptions.size());
1075         }
1076         else
1077         {
1078             delete ctx;
1079             return OC_STACK_ERROR;
1080         }
1081
1082         return result;
1083     }
1084
1085     OCStackResult InProcClientWrapper::CancelObserveResource(
1086             OCDoHandle handle,
1087             const std::string& /*host*/,
1088             const std::string& /*uri*/,
1089             const HeaderOptions& headerOptions,
1090             QualityOfService QoS)
1091     {
1092         OCStackResult result;
1093         auto cLock = m_csdkLock.lock();
1094
1095         if (cLock)
1096         {
1097             std::lock_guard<std::recursive_mutex> lock(*cLock);
1098             OCHeaderOption options[MAX_HEADER_OPTIONS];
1099
1100             result = OCCancel(handle,
1101                     static_cast<OCQualityOfService>(QoS),
1102                     assembleHeaderOptions(options, headerOptions),
1103                     headerOptions.size());
1104         }
1105         else
1106         {
1107             result = OC_STACK_ERROR;
1108         }
1109
1110         return result;
1111     }
1112
1113     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1114                                                        OCDoHandle /*handle*/,
1115             OCClientResponse* clientResponse)
1116     {
1117         ClientCallbackContext::SubscribePresenceContext* context =
1118         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1119
1120         /*
1121          * This a hack while we rethink presence subscription.
1122          */
1123         std::string url = clientResponse->devAddr.addr;
1124
1125         std::thread exec(context->callback, clientResponse->result,
1126                     clientResponse->sequenceNumber, url);
1127
1128         exec.detach();
1129
1130         return OC_STACK_KEEP_TRANSACTION;
1131     }
1132
1133     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1134         const std::string& host, const std::string& resourceType,
1135         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1136     {
1137         if (!presenceHandler)
1138         {
1139             return OC_STACK_INVALID_PARAM;
1140         }
1141
1142         ClientCallbackContext::SubscribePresenceContext* ctx =
1143             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1144         OCCallbackData cbdata;
1145         cbdata.context = static_cast<void*>(ctx),
1146         cbdata.cb      = subscribePresenceCallback;
1147         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1148
1149
1150         auto cLock = m_csdkLock.lock();
1151
1152         std::ostringstream os;
1153         os << host << OC_RSRVD_PRESENCE_URI;
1154
1155         if (!resourceType.empty())
1156         {
1157             os << "?rt=" << resourceType;
1158         }
1159
1160         if (!cLock)
1161         {
1162             delete ctx;
1163             return OC_STACK_ERROR;
1164         }
1165
1166         return OCDoResource(handle, OC_REST_PRESENCE,
1167                             os.str().c_str(), nullptr,
1168                             nullptr, connectivityType,
1169                             OC_LOW_QOS, &cbdata, NULL, 0);
1170     }
1171
1172     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1173     {
1174         OCStackResult result;
1175         auto cLock = m_csdkLock.lock();
1176
1177         if (cLock)
1178         {
1179             std::lock_guard<std::recursive_mutex> lock(*cLock);
1180             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1181         }
1182         else
1183         {
1184             result = OC_STACK_ERROR;
1185         }
1186
1187         return result;
1188     }
1189
1190 #ifdef WITH_CLOUD
1191     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1192                                                                const std::string& host,
1193                                                                const std::vector<std::string>& di,
1194                                                                OCConnectivityType connectivityType,
1195                                                                ObserveCallback& callback)
1196     {
1197         if (!callback)
1198         {
1199             return OC_STACK_INVALID_PARAM;
1200         }
1201         OCStackResult result;
1202
1203         ClientCallbackContext::ObserveContext* ctx =
1204             new ClientCallbackContext::ObserveContext(callback);
1205         OCCallbackData cbdata;
1206         cbdata.context = static_cast<void*>(ctx),
1207         cbdata.cb      = observeResourceCallback;
1208         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1209
1210         auto cLock = m_csdkLock.lock();
1211
1212         if (cLock)
1213         {
1214             std::lock_guard<std::recursive_mutex> lock(*cLock);
1215
1216             std::ostringstream os;
1217             os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
1218             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1219             std::string url = assembleSetResourceUri(os.str(), queryParams);
1220
1221             result = OCDoResource(handle, OC_REST_OBSERVE,
1222                                   url.c_str(), nullptr,
1223                                   nullptr, connectivityType,
1224                                   OC_LOW_QOS, &cbdata,
1225                                   nullptr, 0);
1226         }
1227         else
1228         {
1229             delete ctx;
1230             result = OC_STACK_ERROR;
1231         }
1232
1233         return result;
1234     }
1235 #endif
1236
1237     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1238     {
1239         qos = m_cfg.QoS;
1240         return OC_STACK_OK;
1241     }
1242
1243     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1244            const HeaderOptions& headerOptions)
1245     {
1246         int i = 0;
1247
1248         if ( headerOptions.size() == 0)
1249         {
1250             return nullptr;
1251         }
1252
1253         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1254         {
1255             options[i] = OCHeaderOption();
1256             options[i].protocolID = OC_COAP_ID;
1257             options[i].optionID = it->getOptionID();
1258             options[i].optionLength = it->getOptionData().length() + 1;
1259             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1260             i++;
1261         }
1262
1263         return options;
1264     }
1265
1266     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1267     {
1268         if (!dev)
1269         {
1270             return nullptr;
1271         }
1272
1273         OCDPDev_t* result = new OCDPDev_t(*dev);
1274         result->prm = new OCPrm_t[dev->prmLen];
1275         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1276         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1277     }
1278
1279     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1280     {
1281         while(list)
1282         {
1283             dpList.push_back(cloneDevice(list));
1284             list = list->next;
1285         }
1286     }
1287
1288     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1289             GetDirectPairedCallback& callback)
1290     {
1291         if (!callback || 0 == waittime)
1292         {
1293             return OC_STACK_INVALID_PARAM;
1294         }
1295
1296         OCStackResult result = OC_STACK_ERROR;
1297         const OCDPDev_t *list = nullptr;
1298         PairedDevices dpDeviceList;
1299
1300         auto cLock = m_csdkLock.lock();
1301
1302         if (cLock)
1303         {
1304             std::lock_guard<std::recursive_mutex> lock(*cLock);
1305
1306             list = OCDiscoverDirectPairingDevices(waittime);
1307             if (NULL == list)
1308             {
1309                 result = OC_STACK_NO_RESOURCE;
1310                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1311                     << std::flush;
1312             }
1313             else {
1314                 convert(list, dpDeviceList);
1315                 std::thread exec(callback, dpDeviceList);
1316                 exec.detach();
1317                 result = OC_STACK_OK;
1318             }
1319         }
1320         else
1321         {
1322             result = OC_STACK_ERROR;
1323         }
1324
1325         return result;
1326     }
1327
1328     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1329     {
1330         if (!callback)
1331         {
1332             return OC_STACK_INVALID_PARAM;
1333         }
1334
1335         OCStackResult result = OC_STACK_ERROR;
1336         const OCDPDev_t *list = nullptr;
1337         PairedDevices dpDeviceList;
1338
1339         auto cLock = m_csdkLock.lock();
1340
1341         if (cLock)
1342         {
1343             std::lock_guard<std::recursive_mutex> lock(*cLock);
1344
1345             list = OCGetDirectPairedDevices();
1346             if (NULL == list)
1347             {
1348                 result = OC_STACK_NO_RESOURCE;
1349                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1350                     << std::flush;
1351             }
1352             else {
1353                 convert(list, dpDeviceList);
1354                 std::thread exec(callback, dpDeviceList);
1355                 exec.detach();
1356                 result = OC_STACK_OK;
1357             }
1358         }
1359         else
1360         {
1361             result = OC_STACK_ERROR;
1362         }
1363
1364         return result;
1365     }
1366
1367     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1368             OCStackResult result)
1369     {
1370
1371         ClientCallbackContext::DirectPairingContext* context =
1372             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1373
1374         std::thread exec(context->callback, cloneDevice(peer), result);
1375         exec.detach();
1376     }
1377
1378     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1379             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1380     {
1381         if (!peer || !callback)
1382         {
1383             oclog() << "Invalid parameters" << std::flush;
1384             return OC_STACK_INVALID_PARAM;
1385         }
1386
1387         OCStackResult result = OC_STACK_ERROR;
1388         ClientCallbackContext::DirectPairingContext* context =
1389             new ClientCallbackContext::DirectPairingContext(callback);
1390
1391         auto cLock = m_csdkLock.lock();
1392         if (cLock)
1393         {
1394             std::lock_guard<std::recursive_mutex> lock(*cLock);
1395             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1396                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1397         }
1398         else
1399         {
1400             delete context;
1401             result = OC_STACK_ERROR;
1402         }
1403         return result;
1404     }
1405 }