Merge remote-tracking branch 'origin/master' into notification-service
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try{
169             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171             // loop to ensure valid construction of all resources
172
173             for(auto resource : container.Resources())
174             {
175                 std::thread exec(context->callback, resource);
176                 exec.detach();
177             }
178         }
179         catch (std::exception &e){
180             oclog() << "Exception in listCallback, ignoring response: "
181                     << e.what() << std::flush;
182         }
183
184
185         return OC_STACK_KEEP_TRANSACTION;
186     }
187
188     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
189         OCClientResponse* clientResponse)
190     {
191         if (!ctx || !clientResponse)
192         {
193             return OC_STACK_KEEP_TRANSACTION;
194         }
195
196         ClientCallbackContext::ListenErrorContext* context =
197             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
198         if (!context)
199         {
200             return OC_STACK_KEEP_TRANSACTION;
201         }
202
203         OCStackResult result = clientResponse->result;
204         if (result == OC_STACK_OK)
205         {
206             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
207             {
208                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
209                     << std::flush;
210                 return OC_STACK_KEEP_TRANSACTION;
211             }
212
213             auto clientWrapper = context->clientWrapper.lock();
214
215             if (!clientWrapper)
216             {
217                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
218                         << std::flush;
219                 return OC_STACK_KEEP_TRANSACTION;
220             }
221
222             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
223                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
224             // loop to ensure valid construction of all resources
225             for (auto resource : container.Resources())
226             {
227                 std::thread exec(context->callback, resource);
228                 exec.detach();
229             }
230             return OC_STACK_KEEP_TRANSACTION;
231         }
232
233         std::string resourceURI = clientResponse->resourceUri;
234         std::thread exec(context->errorCallback, resourceURI, result);
235         exec.detach();
236         return OC_STACK_DELETE_TRANSACTION;
237     }
238
239     OCStackResult InProcClientWrapper::ListenForResource(
240             const std::string& serviceUrl,
241             const std::string& resourceType,
242             OCConnectivityType connectivityType,
243             FindCallback& callback, QualityOfService QoS)
244     {
245         if (!callback)
246         {
247             return OC_STACK_INVALID_PARAM;
248         }
249
250         OCStackResult result;
251         ostringstream resourceUri;
252         resourceUri << serviceUrl << resourceType;
253
254         ClientCallbackContext::ListenContext* context =
255             new ClientCallbackContext::ListenContext(callback, shared_from_this());
256         OCCallbackData cbdata;
257         cbdata.context = static_cast<void*>(context),
258         cbdata.cb      = listenCallback;
259         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
260
261         auto cLock = m_csdkLock.lock();
262         if (cLock)
263         {
264             std::lock_guard<std::recursive_mutex> lock(*cLock);
265             result = OCDoResource(nullptr, OC_REST_DISCOVER,
266                                   resourceUri.str().c_str(),
267                                   nullptr, nullptr, connectivityType,
268                                   static_cast<OCQualityOfService>(QoS),
269                                   &cbdata,
270                                   nullptr, 0);
271         }
272         else
273         {
274             delete context;
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280     OCStackResult InProcClientWrapper::ListenErrorForResource(
281             const std::string& serviceUrl,
282             const std::string& resourceType,
283             OCConnectivityType connectivityType,
284             FindCallback& callback, FindErrorCallback& errorCallback,
285             QualityOfService QoS)
286     {
287         if (!callback)
288         {
289             return OC_STACK_INVALID_PARAM;
290         }
291
292         ostringstream resourceUri;
293         resourceUri << serviceUrl << resourceType;
294
295         ClientCallbackContext::ListenErrorContext* context =
296             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
297                                                           shared_from_this());
298         if (!context)
299         {
300             return OC_STACK_ERROR;
301         }
302
303         OCCallbackData cbdata(
304                 static_cast<void*>(context),
305                 listenErrorCallback,
306                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
307             );
308
309         OCStackResult result;
310         auto cLock = m_csdkLock.lock();
311         if (cLock)
312         {
313             std::lock_guard<std::recursive_mutex> lock(*cLock);
314             result = OCDoResource(nullptr, OC_REST_DISCOVER,
315                                   resourceUri.str().c_str(),
316                                   nullptr, nullptr, connectivityType,
317                                   static_cast<OCQualityOfService>(QoS),
318                                   &cbdata,
319                                   nullptr, 0);
320         }
321         else
322         {
323             delete context;
324             result = OC_STACK_ERROR;
325         }
326         return result;
327     }
328 #ifdef WITH_MQ
329     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
330                                               OCClientResponse* clientResponse)
331     {
332         ClientCallbackContext::MQTopicContext* context =
333             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
334
335         if (!clientResponse || !context)
336         {
337             return OC_STACK_DELETE_TRANSACTION;
338         }
339
340         if (clientResponse->result != OC_STACK_OK)
341         {
342             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
343                     << clientResponse->result
344                     << std::flush;
345
346             std::thread exec(context->callback, clientResponse->result,
347                              clientResponse->resourceUri, nullptr);
348             exec.detach();
349
350             return OC_STACK_DELETE_TRANSACTION;
351         }
352
353         auto clientWrapper = context->clientWrapper.lock();
354         if (!clientWrapper)
355         {
356             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
357                     << std::flush;
358             return OC_STACK_DELETE_TRANSACTION;
359         }
360
361         try{
362             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
363                                         (OCRepPayload *) clientResponse->payload);
364
365             // loop to ensure valid construction of all resources
366             for (auto resource : container.Resources())
367             {
368                 std::thread exec(context->callback, clientResponse->result,
369                                  clientResponse->resourceUri, resource);
370                 exec.detach();
371             }
372         }
373         catch (std::exception &e){
374             oclog() << "Exception in listCallback, ignoring response: "
375                     << e.what() << std::flush;
376         }
377
378         return OC_STACK_DELETE_TRANSACTION;
379     }
380
381     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
382                                                         const std::string& resourceUri,
383                                                         const QueryParamsMap& queryParams,
384                                                         const HeaderOptions& headerOptions,
385                                                         MQTopicCallback& callback,
386                                                         QualityOfService QoS)
387     {
388         oclog() << "ListenForMQTopic()" << std::flush;
389
390         if (!callback)
391         {
392             return OC_STACK_INVALID_PARAM;
393         }
394
395         ClientCallbackContext::MQTopicContext* context =
396             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
397         OCCallbackData cbdata;
398         cbdata.context = static_cast<void*>(context),
399         cbdata.cb      = listenMQCallback;
400         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
401
402         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
403
404         OCStackResult result = OC_STACK_ERROR;
405         auto cLock = m_csdkLock.lock();
406         if (cLock)
407         {
408             std::lock_guard<std::recursive_mutex> lock(*cLock);
409             OCHeaderOption options[MAX_HEADER_OPTIONS];
410             result = OCDoResource(
411                                   nullptr, OC_REST_GET,
412                                   uri.c_str(),
413                                   &devAddr, nullptr,
414                                   CT_DEFAULT,
415                                   static_cast<OCQualityOfService>(QoS),
416                                   &cbdata,
417                                   assembleHeaderOptions(options, headerOptions),
418                                   headerOptions.size());
419         }
420         else
421         {
422             delete context;
423         }
424
425         return result;
426     }
427 #endif
428
429     OCStackApplicationResult listenDeviceCallback(void* ctx,
430                                                   OCDoHandle /*handle*/,
431             OCClientResponse* clientResponse)
432     {
433         ClientCallbackContext::DeviceListenContext* context =
434             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
435
436         try
437         {
438             OCRepresentation rep = parseGetSetCallback(clientResponse);
439             std::thread exec(context->callback, rep);
440             exec.detach();
441         }
442         catch(OC::OCException& e)
443         {
444             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
445                 <<e.what() <<std::flush;
446         }
447
448         return OC_STACK_KEEP_TRANSACTION;
449     }
450
451     OCStackResult InProcClientWrapper::ListenForDevice(
452             const std::string& serviceUrl,
453             const std::string& deviceURI,
454             OCConnectivityType connectivityType,
455             FindDeviceCallback& callback,
456             QualityOfService QoS)
457     {
458         if (!callback)
459         {
460             return OC_STACK_INVALID_PARAM;
461         }
462         OCStackResult result;
463         ostringstream deviceUri;
464         deviceUri << serviceUrl << deviceURI;
465
466         ClientCallbackContext::DeviceListenContext* context =
467             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
468         OCCallbackData cbdata;
469
470         cbdata.context = static_cast<void*>(context),
471         cbdata.cb      = listenDeviceCallback;
472         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
473
474         auto cLock = m_csdkLock.lock();
475         if (cLock)
476         {
477             std::lock_guard<std::recursive_mutex> lock(*cLock);
478             result = OCDoResource(nullptr, OC_REST_DISCOVER,
479                                   deviceUri.str().c_str(),
480                                   nullptr, nullptr, connectivityType,
481                                   static_cast<OCQualityOfService>(QoS),
482                                   &cbdata,
483                                   nullptr, 0);
484         }
485         else
486         {
487             delete context;
488             result = OC_STACK_ERROR;
489         }
490         return result;
491     }
492
493     void parseServerHeaderOptions(OCClientResponse* clientResponse,
494                     HeaderOptions& serverHeaderOptions)
495     {
496         if (clientResponse)
497         {
498             // Parse header options from server
499             uint16_t optionID;
500             std::string optionData;
501
502             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
503             {
504                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
505                 optionData = reinterpret_cast<const char*>
506                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
507                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
508                 serverHeaderOptions.push_back(headerOption);
509             }
510         }
511         else
512         {
513             // clientResponse is invalid
514             // TODO check proper logging
515             std::cout << " Invalid response " << std::endl;
516         }
517     }
518
519 #ifdef WITH_MQ
520     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
521                     OCClientResponse* clientResponse)
522     {
523         ClientCallbackContext::MQTopicContext* context =
524             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
525         HeaderOptions serverHeaderOptions;
526
527         if (!clientResponse || !context)
528         {
529             return OC_STACK_DELETE_TRANSACTION;
530         }
531
532         std::string createdUri;
533         bool isLocationOption = false;
534         OCStackResult result = clientResponse->result;
535         if (OC_STACK_OK               == result ||
536             OC_STACK_RESOURCE_CREATED == result)
537         {
538             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
539
540             for (auto headerOption : serverHeaderOptions)
541             {
542                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
543                 {
544                     createdUri += "/";
545                     createdUri += headerOption.getOptionData();
546                     if (!isLocationOption)
547                     {
548                         isLocationOption = true;
549                     }
550                 }
551             }
552         }
553
554         if (!isLocationOption)
555         {
556             createdUri = clientResponse->resourceUri;
557         }
558
559         auto clientWrapper = context->clientWrapper.lock();
560
561         if (!clientWrapper)
562         {
563             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
564                     << std::flush;
565             return OC_STACK_DELETE_TRANSACTION;
566         }
567
568         try{
569             if (OC_STACK_OK               == result ||
570                 OC_STACK_RESOURCE_CREATED == result)
571             {
572                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
573                                             createdUri);
574                 for (auto resource : container.Resources())
575                 {
576                     std::thread exec(context->callback, result, createdUri, resource);
577                     exec.detach();
578                 }
579             }
580             else
581             {
582                 std::thread exec(context->callback, result, createdUri, nullptr);
583                 exec.detach();
584             }
585         }
586         catch (std::exception &e){
587             oclog() << "Exception in createMQTopicCallback, ignoring response: "
588                     << e.what() << std::flush;
589         }
590         return OC_STACK_DELETE_TRANSACTION;
591     }
592
593     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
594                 const OCDevAddr& devAddr,
595                 const std::string& uri,
596                 const OCRepresentation& rep,
597                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
598                 MQTopicCallback& callback, QualityOfService QoS)
599     {
600         if (!callback)
601         {
602             return OC_STACK_INVALID_PARAM;
603         }
604         OCStackResult result;
605         ClientCallbackContext::MQTopicContext* ctx =
606                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
607         OCCallbackData cbdata;
608         cbdata.context = static_cast<void*>(ctx),
609         cbdata.cb      = createMQTopicCallback;
610         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
611
612         std::string url = assembleSetResourceUri(uri, queryParams);
613
614         auto cLock = m_csdkLock.lock();
615
616         if (cLock)
617         {
618             std::lock_guard<std::recursive_mutex> lock(*cLock);
619             OCHeaderOption options[MAX_HEADER_OPTIONS];
620
621             result = OCDoResource(nullptr, OC_REST_PUT,
622                                   url.c_str(), &devAddr,
623                                   assembleSetResourcePayload(rep),
624                                   CT_DEFAULT,
625                                   static_cast<OCQualityOfService>(QoS),
626                                   &cbdata,
627                                   assembleHeaderOptions(options, headerOptions),
628                                   headerOptions.size());
629         }
630         else
631         {
632             delete ctx;
633             result = OC_STACK_ERROR;
634         }
635
636         return result;
637     }
638 #endif
639     OCStackApplicationResult getResourceCallback(void* ctx,
640                                                  OCDoHandle /*handle*/,
641         OCClientResponse* clientResponse)
642     {
643         ClientCallbackContext::GetContext* context =
644             static_cast<ClientCallbackContext::GetContext*>(ctx);
645
646         OCRepresentation rep;
647         HeaderOptions serverHeaderOptions;
648         OCStackResult result = clientResponse->result;
649         if (result == OC_STACK_OK)
650         {
651             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
652             try
653             {
654                 rep = parseGetSetCallback(clientResponse);
655             }
656             catch(OC::OCException& e)
657             {
658                 result = e.code();
659             }
660         }
661
662         std::thread exec(context->callback, serverHeaderOptions, rep, result);
663         exec.detach();
664         return OC_STACK_DELETE_TRANSACTION;
665     }
666
667     OCStackResult InProcClientWrapper::GetResourceRepresentation(
668         const OCDevAddr& devAddr,
669         const std::string& resourceUri,
670         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
671         OCConnectivityType connectivityType,
672         GetCallback& callback, QualityOfService QoS)
673     {
674         if (!callback)
675         {
676             return OC_STACK_INVALID_PARAM;
677         }
678         OCStackResult result;
679         ClientCallbackContext::GetContext* ctx =
680             new ClientCallbackContext::GetContext(callback);
681         OCCallbackData cbdata;
682         cbdata.context = static_cast<void*>(ctx),
683         cbdata.cb      = getResourceCallback;
684         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
685
686
687         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
688
689         auto cLock = m_csdkLock.lock();
690
691         if (cLock)
692         {
693             std::lock_guard<std::recursive_mutex> lock(*cLock);
694             OCHeaderOption options[MAX_HEADER_OPTIONS];
695
696             result = OCDoResource(
697                                   nullptr, OC_REST_GET,
698                                   uri.c_str(),
699                                   &devAddr, nullptr,
700                                   connectivityType,
701                                   static_cast<OCQualityOfService>(QoS),
702                                   &cbdata,
703                                   assembleHeaderOptions(options, headerOptions),
704                                   headerOptions.size());
705         }
706         else
707         {
708             delete ctx;
709             result = OC_STACK_ERROR;
710         }
711         return result;
712     }
713
714
715     OCStackApplicationResult setResourceCallback(void* ctx,
716                                                  OCDoHandle /*handle*/,
717         OCClientResponse* clientResponse)
718     {
719         ClientCallbackContext::SetContext* context =
720             static_cast<ClientCallbackContext::SetContext*>(ctx);
721         OCRepresentation attrs;
722         HeaderOptions serverHeaderOptions;
723
724         OCStackResult result = clientResponse->result;
725         if (OC_STACK_OK               == result ||
726             OC_STACK_RESOURCE_CREATED == result ||
727             OC_STACK_RESOURCE_DELETED == result ||
728             OC_STACK_RESOURCE_CHANGED == result)
729         {
730             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
731             try
732             {
733                 attrs = parseGetSetCallback(clientResponse);
734             }
735             catch(OC::OCException& e)
736             {
737                 result = e.code();
738             }
739         }
740
741         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
742         exec.detach();
743         return OC_STACK_DELETE_TRANSACTION;
744     }
745
746     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
747         const QueryParamsMap& queryParams)
748     {
749         if (!uri.empty())
750         {
751             if (uri.back() == '/')
752             {
753                 uri.resize(uri.size() - 1);
754             }
755         }
756
757         ostringstream paramsList;
758         if (queryParams.size() > 0)
759         {
760             paramsList << '?';
761         }
762
763         for (auto& param : queryParams)
764         {
765             paramsList << param.first <<'='<<param.second<<';';
766         }
767
768         std::string queryString = paramsList.str();
769
770         if (queryString.empty())
771         {
772             return uri;
773         }
774
775         if (queryString.back() == ';')
776         {
777             queryString.resize(queryString.size() - 1);
778         }
779
780         std::string ret = uri + queryString;
781         return ret;
782     }
783
784     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
785         const QueryParamsList& queryParams)
786     {
787         if (!uri.empty())
788         {
789             if (uri.back() == '/')
790             {
791                 uri.resize(uri.size() - 1);
792             }
793         }
794
795         ostringstream paramsList;
796         if (queryParams.size() > 0)
797         {
798             paramsList << '?';
799         }
800
801         for (auto& param : queryParams)
802         {
803             for (auto& paramList : param.second)
804             {
805                 paramsList << param.first << '=' << paramList;
806                 if (paramList != param.second.back())
807                 {
808                     paramsList << '&';
809                 }
810             }
811             paramsList << ';';
812         }
813
814         std::string queryString = paramsList.str();
815
816         if (queryString.empty())
817         {
818             return uri;
819         }
820
821         if (queryString.back() == ';')
822         {
823             queryString.resize(queryString.size() - 1);
824         }
825
826         std::string ret = uri + queryString;
827         return ret;
828     }
829
830     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
831     {
832         MessageContainer ocInfo;
833         ocInfo.addRepresentation(rep);
834         for(const OCRepresentation& r : rep.getChildren())
835         {
836             ocInfo.addRepresentation(r);
837         }
838
839         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
840     }
841
842     OCStackResult InProcClientWrapper::PostResourceRepresentation(
843         const OCDevAddr& devAddr,
844         const std::string& uri,
845         const OCRepresentation& rep,
846         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
847         OCConnectivityType connectivityType,
848         PostCallback& callback, QualityOfService QoS)
849     {
850         if (!callback)
851         {
852             return OC_STACK_INVALID_PARAM;
853         }
854         OCStackResult result;
855         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
856         OCCallbackData cbdata;
857         cbdata.context = static_cast<void*>(ctx),
858         cbdata.cb      = setResourceCallback;
859         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
860
861
862         std::string url = assembleSetResourceUri(uri, queryParams);
863
864         auto cLock = m_csdkLock.lock();
865
866         if (cLock)
867         {
868             std::lock_guard<std::recursive_mutex> lock(*cLock);
869             OCHeaderOption options[MAX_HEADER_OPTIONS];
870
871             result = OCDoResource(nullptr, OC_REST_POST,
872                                   url.c_str(), &devAddr,
873                                   assembleSetResourcePayload(rep),
874                                   connectivityType,
875                                   static_cast<OCQualityOfService>(QoS),
876                                   &cbdata,
877                                   assembleHeaderOptions(options, headerOptions),
878                                   headerOptions.size());
879         }
880         else
881         {
882             delete ctx;
883             result = OC_STACK_ERROR;
884         }
885
886         return result;
887     }
888
889     OCStackResult InProcClientWrapper::PutResourceRepresentation(
890         const OCDevAddr& devAddr,
891         const std::string& uri,
892         const OCRepresentation& rep,
893         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
894         PutCallback& callback, QualityOfService QoS)
895     {
896         if (!callback)
897         {
898             return OC_STACK_INVALID_PARAM;
899         }
900         OCStackResult result;
901         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
902         OCCallbackData cbdata;
903         cbdata.context = static_cast<void*>(ctx),
904         cbdata.cb      = setResourceCallback;
905         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
906
907
908         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
909
910         auto cLock = m_csdkLock.lock();
911
912         if (cLock)
913         {
914             std::lock_guard<std::recursive_mutex> lock(*cLock);
915             OCDoHandle handle;
916             OCHeaderOption options[MAX_HEADER_OPTIONS];
917
918             result = OCDoResource(&handle, OC_REST_PUT,
919                                   url.c_str(), &devAddr,
920                                   assembleSetResourcePayload(rep),
921                                   CT_DEFAULT,
922                                   static_cast<OCQualityOfService>(QoS),
923                                   &cbdata,
924                                   assembleHeaderOptions(options, headerOptions),
925                                   headerOptions.size());
926         }
927         else
928         {
929             delete ctx;
930             result = OC_STACK_ERROR;
931         }
932
933         return result;
934     }
935
936     OCStackApplicationResult deleteResourceCallback(void* ctx,
937                                                     OCDoHandle /*handle*/,
938         OCClientResponse* clientResponse)
939     {
940         ClientCallbackContext::DeleteContext* context =
941             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
942         HeaderOptions serverHeaderOptions;
943
944         if (clientResponse->result == OC_STACK_OK)
945         {
946             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
947         }
948         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
949         exec.detach();
950         return OC_STACK_DELETE_TRANSACTION;
951     }
952
953     OCStackResult InProcClientWrapper::DeleteResource(
954         const OCDevAddr& devAddr,
955         const std::string& uri,
956         const HeaderOptions& headerOptions,
957         OCConnectivityType connectivityType,
958         DeleteCallback& callback,
959         QualityOfService /*QoS*/)
960     {
961         if (!callback)
962         {
963             return OC_STACK_INVALID_PARAM;
964         }
965         OCStackResult result;
966         ClientCallbackContext::DeleteContext* ctx =
967             new ClientCallbackContext::DeleteContext(callback);
968         OCCallbackData cbdata;
969         cbdata.context = static_cast<void*>(ctx),
970         cbdata.cb      = deleteResourceCallback;
971         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
972
973
974         auto cLock = m_csdkLock.lock();
975
976         if (cLock)
977         {
978             OCHeaderOption options[MAX_HEADER_OPTIONS];
979
980             std::lock_guard<std::recursive_mutex> lock(*cLock);
981
982             result = OCDoResource(nullptr, OC_REST_DELETE,
983                                   uri.c_str(), &devAddr,
984                                   nullptr,
985                                   connectivityType,
986                                   static_cast<OCQualityOfService>(m_cfg.QoS),
987                                   &cbdata,
988                                   assembleHeaderOptions(options, headerOptions),
989                                   headerOptions.size());
990         }
991         else
992         {
993             delete ctx;
994             result = OC_STACK_ERROR;
995         }
996
997         return result;
998     }
999
1000     OCStackApplicationResult observeResourceCallback(void* ctx,
1001                                                      OCDoHandle /*handle*/,
1002         OCClientResponse* clientResponse)
1003     {
1004         ClientCallbackContext::ObserveContext* context =
1005             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1006         OCRepresentation attrs;
1007         HeaderOptions serverHeaderOptions;
1008         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1009         OCStackResult result = clientResponse->result;
1010         if (clientResponse->result == OC_STACK_OK)
1011         {
1012             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1013             try
1014             {
1015                 attrs = parseGetSetCallback(clientResponse);
1016             }
1017             catch(OC::OCException& e)
1018             {
1019                 result = e.code();
1020             }
1021         }
1022         std::thread exec(context->callback, serverHeaderOptions, attrs,
1023                     result, sequenceNumber);
1024         exec.detach();
1025
1026         return OC_STACK_KEEP_TRANSACTION;
1027     }
1028
1029     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1030         const OCDevAddr& devAddr,
1031         const std::string& uri,
1032         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1033         ObserveCallback& callback, QualityOfService QoS)
1034     {
1035         if (!callback)
1036         {
1037             return OC_STACK_INVALID_PARAM;
1038         }
1039         OCStackResult result;
1040
1041         ClientCallbackContext::ObserveContext* ctx =
1042             new ClientCallbackContext::ObserveContext(callback);
1043         OCCallbackData cbdata;
1044         cbdata.context = static_cast<void*>(ctx),
1045         cbdata.cb      = observeResourceCallback;
1046         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1047
1048
1049         OCMethod method;
1050         if (observeType == ObserveType::Observe)
1051         {
1052             method = OC_REST_OBSERVE;
1053         }
1054         else if (observeType == ObserveType::ObserveAll)
1055         {
1056             method = OC_REST_OBSERVE_ALL;
1057         }
1058         else
1059         {
1060             method = OC_REST_OBSERVE_ALL;
1061         }
1062
1063         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1064
1065         auto cLock = m_csdkLock.lock();
1066
1067         if (cLock)
1068         {
1069             std::lock_guard<std::recursive_mutex> lock(*cLock);
1070             OCHeaderOption options[MAX_HEADER_OPTIONS];
1071
1072             result = OCDoResource(handle, method,
1073                                   url.c_str(), &devAddr,
1074                                   nullptr,
1075                                   CT_DEFAULT,
1076                                   static_cast<OCQualityOfService>(QoS),
1077                                   &cbdata,
1078                                   assembleHeaderOptions(options, headerOptions),
1079                                   headerOptions.size());
1080         }
1081         else
1082         {
1083             delete ctx;
1084             return OC_STACK_ERROR;
1085         }
1086
1087         return result;
1088     }
1089
1090     OCStackResult InProcClientWrapper::CancelObserveResource(
1091             OCDoHandle handle,
1092             const std::string& /*host*/,
1093             const std::string& /*uri*/,
1094             const HeaderOptions& headerOptions,
1095             QualityOfService QoS)
1096     {
1097         OCStackResult result;
1098         auto cLock = m_csdkLock.lock();
1099
1100         if (cLock)
1101         {
1102             std::lock_guard<std::recursive_mutex> lock(*cLock);
1103             OCHeaderOption options[MAX_HEADER_OPTIONS];
1104
1105             result = OCCancel(handle,
1106                     static_cast<OCQualityOfService>(QoS),
1107                     assembleHeaderOptions(options, headerOptions),
1108                     headerOptions.size());
1109         }
1110         else
1111         {
1112             result = OC_STACK_ERROR;
1113         }
1114
1115         return result;
1116     }
1117
1118     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1119                                                        OCDoHandle /*handle*/,
1120             OCClientResponse* clientResponse)
1121     {
1122         ClientCallbackContext::SubscribePresenceContext* context =
1123         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1124
1125         /*
1126          * This a hack while we rethink presence subscription.
1127          */
1128         std::string url = clientResponse->devAddr.addr;
1129
1130         std::thread exec(context->callback, clientResponse->result,
1131                     clientResponse->sequenceNumber, url);
1132
1133         exec.detach();
1134
1135         return OC_STACK_KEEP_TRANSACTION;
1136     }
1137
1138     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1139         const std::string& host, const std::string& resourceType,
1140         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1141     {
1142         if (!presenceHandler)
1143         {
1144             return OC_STACK_INVALID_PARAM;
1145         }
1146
1147         ClientCallbackContext::SubscribePresenceContext* ctx =
1148             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1149         OCCallbackData cbdata;
1150         cbdata.context = static_cast<void*>(ctx),
1151         cbdata.cb      = subscribePresenceCallback;
1152         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1153
1154
1155         auto cLock = m_csdkLock.lock();
1156
1157         std::ostringstream os;
1158         os << host << OC_RSRVD_PRESENCE_URI;
1159
1160         if (!resourceType.empty())
1161         {
1162             os << "?rt=" << resourceType;
1163         }
1164
1165         if (!cLock)
1166         {
1167             delete ctx;
1168             return OC_STACK_ERROR;
1169         }
1170
1171         return OCDoResource(handle, OC_REST_PRESENCE,
1172                             os.str().c_str(), nullptr,
1173                             nullptr, connectivityType,
1174                             OC_LOW_QOS, &cbdata, NULL, 0);
1175     }
1176
1177     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1178     {
1179         OCStackResult result;
1180         auto cLock = m_csdkLock.lock();
1181
1182         if (cLock)
1183         {
1184             std::lock_guard<std::recursive_mutex> lock(*cLock);
1185             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1186         }
1187         else
1188         {
1189             result = OC_STACK_ERROR;
1190         }
1191
1192         return result;
1193     }
1194
1195 #ifdef WITH_CLOUD
1196     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1197                                                                const std::string& host,
1198                                                                const std::vector<std::string>& di,
1199                                                                OCConnectivityType connectivityType,
1200                                                                ObserveCallback& callback)
1201     {
1202         if (!callback)
1203         {
1204             return OC_STACK_INVALID_PARAM;
1205         }
1206         OCStackResult result;
1207
1208         ClientCallbackContext::ObserveContext* ctx =
1209             new ClientCallbackContext::ObserveContext(callback);
1210         OCCallbackData cbdata;
1211         cbdata.context = static_cast<void*>(ctx),
1212         cbdata.cb      = observeResourceCallback;
1213         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1214
1215         auto cLock = m_csdkLock.lock();
1216
1217         if (cLock)
1218         {
1219             std::lock_guard<std::recursive_mutex> lock(*cLock);
1220
1221             std::ostringstream os;
1222             os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
1223             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1224             std::string url = assembleSetResourceUri(os.str(), queryParams);
1225
1226             result = OCDoResource(handle, OC_REST_OBSERVE,
1227                                   url.c_str(), nullptr,
1228                                   nullptr, connectivityType,
1229                                   OC_LOW_QOS, &cbdata,
1230                                   nullptr, 0);
1231         }
1232         else
1233         {
1234             delete ctx;
1235             result = OC_STACK_ERROR;
1236         }
1237
1238         return result;
1239     }
1240 #endif
1241
1242     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1243     {
1244         qos = m_cfg.QoS;
1245         return OC_STACK_OK;
1246     }
1247
1248     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1249            const HeaderOptions& headerOptions)
1250     {
1251         int i = 0;
1252
1253         if ( headerOptions.size() == 0)
1254         {
1255             return nullptr;
1256         }
1257
1258         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1259         {
1260             options[i] = OCHeaderOption();
1261             options[i].protocolID = OC_COAP_ID;
1262             options[i].optionID = it->getOptionID();
1263             options[i].optionLength = it->getOptionData().length() + 1;
1264             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1265             i++;
1266         }
1267
1268         return options;
1269     }
1270
1271     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1272     {
1273         if (!dev)
1274         {
1275             return nullptr;
1276         }
1277
1278         OCDPDev_t* result = new OCDPDev_t(*dev);
1279         result->prm = new OCPrm_t[dev->prmLen];
1280         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1281         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1282     }
1283
1284     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1285     {
1286         while(list)
1287         {
1288             dpList.push_back(cloneDevice(list));
1289             list = list->next;
1290         }
1291     }
1292
1293     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1294             GetDirectPairedCallback& callback)
1295     {
1296         if (!callback || 0 == waittime)
1297         {
1298             return OC_STACK_INVALID_PARAM;
1299         }
1300
1301         OCStackResult result = OC_STACK_ERROR;
1302         const OCDPDev_t *list = nullptr;
1303         PairedDevices dpDeviceList;
1304
1305         auto cLock = m_csdkLock.lock();
1306
1307         if (cLock)
1308         {
1309             std::lock_guard<std::recursive_mutex> lock(*cLock);
1310
1311             list = OCDiscoverDirectPairingDevices(waittime);
1312             if (NULL == list)
1313             {
1314                 result = OC_STACK_NO_RESOURCE;
1315                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1316                     << std::flush;
1317             }
1318             else {
1319                 convert(list, dpDeviceList);
1320                 std::thread exec(callback, dpDeviceList);
1321                 exec.detach();
1322                 result = OC_STACK_OK;
1323             }
1324         }
1325         else
1326         {
1327             result = OC_STACK_ERROR;
1328         }
1329
1330         return result;
1331     }
1332
1333     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1334     {
1335         if (!callback)
1336         {
1337             return OC_STACK_INVALID_PARAM;
1338         }
1339
1340         OCStackResult result = OC_STACK_ERROR;
1341         const OCDPDev_t *list = nullptr;
1342         PairedDevices dpDeviceList;
1343
1344         auto cLock = m_csdkLock.lock();
1345
1346         if (cLock)
1347         {
1348             std::lock_guard<std::recursive_mutex> lock(*cLock);
1349
1350             list = OCGetDirectPairedDevices();
1351             if (NULL == list)
1352             {
1353                 result = OC_STACK_NO_RESOURCE;
1354                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1355                     << std::flush;
1356             }
1357             else {
1358                 convert(list, dpDeviceList);
1359                 std::thread exec(callback, dpDeviceList);
1360                 exec.detach();
1361                 result = OC_STACK_OK;
1362             }
1363         }
1364         else
1365         {
1366             result = OC_STACK_ERROR;
1367         }
1368
1369         return result;
1370     }
1371
1372     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1373             OCStackResult result)
1374     {
1375
1376         ClientCallbackContext::DirectPairingContext* context =
1377             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1378
1379         std::thread exec(context->callback, cloneDevice(peer), result);
1380         exec.detach();
1381     }
1382
1383     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1384             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1385     {
1386         if (!peer || !callback)
1387         {
1388             oclog() << "Invalid parameters" << std::flush;
1389             return OC_STACK_INVALID_PARAM;
1390         }
1391
1392         OCStackResult result = OC_STACK_ERROR;
1393         ClientCallbackContext::DirectPairingContext* context =
1394             new ClientCallbackContext::DirectPairingContext(callback);
1395
1396         auto cLock = m_csdkLock.lock();
1397         if (cLock)
1398         {
1399             std::lock_guard<std::recursive_mutex> lock(*cLock);
1400             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1401                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1402         }
1403         else
1404         {
1405             delete context;
1406             result = OC_STACK_ERROR;
1407         }
1408         return result;
1409     }
1410 }