Fix for IOT-1394.
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try
169         {
170             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
171                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
172             // loop to ensure valid construction of all resources
173
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179         }
180         catch (std::exception &e)
181         {
182             oclog() << "Exception in listCallback, ignoring response: "
183                     << e.what() << std::flush;
184         }
185
186
187         return OC_STACK_KEEP_TRANSACTION;
188     }
189
190     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
191         OCClientResponse* clientResponse)
192     {
193         if (!ctx || !clientResponse)
194         {
195             return OC_STACK_KEEP_TRANSACTION;
196         }
197
198         ClientCallbackContext::ListenErrorContext* context =
199             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
200         if (!context)
201         {
202             return OC_STACK_KEEP_TRANSACTION;
203         }
204
205         OCStackResult result = clientResponse->result;
206         if (result == OC_STACK_OK)
207         {
208             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
209             {
210                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
211                     << std::flush;
212                 return OC_STACK_KEEP_TRANSACTION;
213             }
214
215             auto clientWrapper = context->clientWrapper.lock();
216
217             if (!clientWrapper)
218             {
219                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
220                         << std::flush;
221                 return OC_STACK_KEEP_TRANSACTION;
222             }
223
224             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
225                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
226             // loop to ensure valid construction of all resources
227             for (auto resource : container.Resources())
228             {
229                 std::thread exec(context->callback, resource);
230                 exec.detach();
231             }
232             return OC_STACK_KEEP_TRANSACTION;
233         }
234
235         std::string resourceURI = clientResponse->resourceUri;
236         std::thread exec(context->errorCallback, resourceURI, result);
237         exec.detach();
238         return OC_STACK_KEEP_TRANSACTION;
239     }
240
241     OCStackResult InProcClientWrapper::ListenForResource(
242             const std::string& serviceUrl,
243             const std::string& resourceType,
244             OCConnectivityType connectivityType,
245             FindCallback& callback, QualityOfService QoS)
246     {
247         if (!callback)
248         {
249             return OC_STACK_INVALID_PARAM;
250         }
251
252         OCStackResult result;
253         ostringstream resourceUri;
254         resourceUri << serviceUrl << resourceType;
255
256         ClientCallbackContext::ListenContext* context =
257             new ClientCallbackContext::ListenContext(callback, shared_from_this());
258         OCCallbackData cbdata;
259         cbdata.context = static_cast<void*>(context),
260         cbdata.cb      = listenCallback;
261         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
262
263         auto cLock = m_csdkLock.lock();
264         if (cLock)
265         {
266             std::lock_guard<std::recursive_mutex> lock(*cLock);
267             result = OCDoResource(nullptr, OC_REST_DISCOVER,
268                                   resourceUri.str().c_str(),
269                                   nullptr, nullptr, connectivityType,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   nullptr, 0);
273         }
274         else
275         {
276             delete context;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282     OCStackResult InProcClientWrapper::ListenErrorForResource(
283             const std::string& serviceUrl,
284             const std::string& resourceType,
285             OCConnectivityType connectivityType,
286             FindCallback& callback, FindErrorCallback& errorCallback,
287             QualityOfService QoS)
288     {
289         if (!callback)
290         {
291             return OC_STACK_INVALID_PARAM;
292         }
293
294         ostringstream resourceUri;
295         resourceUri << serviceUrl << resourceType;
296
297         ClientCallbackContext::ListenErrorContext* context =
298             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
299                                                           shared_from_this());
300         if (!context)
301         {
302             return OC_STACK_ERROR;
303         }
304
305         OCCallbackData cbdata(
306                 static_cast<void*>(context),
307                 listenErrorCallback,
308                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
309             );
310
311         OCStackResult result;
312         auto cLock = m_csdkLock.lock();
313         if (cLock)
314         {
315             std::lock_guard<std::recursive_mutex> lock(*cLock);
316             result = OCDoResource(nullptr, OC_REST_DISCOVER,
317                                   resourceUri.str().c_str(),
318                                   nullptr, nullptr, connectivityType,
319                                   static_cast<OCQualityOfService>(QoS),
320                                   &cbdata,
321                                   nullptr, 0);
322         }
323         else
324         {
325             delete context;
326             result = OC_STACK_ERROR;
327         }
328         return result;
329     }
330 #ifdef WITH_MQ
331     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
332                                               OCClientResponse* clientResponse)
333     {
334         ClientCallbackContext::MQTopicContext* context =
335             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
336
337         if (!clientResponse || !context)
338         {
339             return OC_STACK_DELETE_TRANSACTION;
340         }
341
342         std::string resourceURI = clientResponse->resourceUri;
343         if (clientResponse->result != OC_STACK_OK)
344         {
345             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
346                     << clientResponse->result
347                     << std::flush;
348
349             std::thread exec(context->callback, clientResponse->result,
350                              resourceURI, nullptr);
351             exec.detach();
352
353             return OC_STACK_DELETE_TRANSACTION;
354         }
355
356         auto clientWrapper = context->clientWrapper.lock();
357         if (!clientWrapper)
358         {
359             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
360                     << std::flush;
361             return OC_STACK_DELETE_TRANSACTION;
362         }
363
364         try
365         {
366             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
367                                         (OCRepPayload *) clientResponse->payload);
368
369             // loop to ensure valid construction of all resources
370             for (auto resource : container.Resources())
371             {
372                 std::thread exec(context->callback, clientResponse->result,
373                                  resourceURI, resource);
374                 exec.detach();
375             }
376         }
377         catch (std::exception &e)
378         {
379             oclog() << "Exception in listCallback, ignoring response: "
380                     << e.what() << std::flush;
381         }
382
383         return OC_STACK_DELETE_TRANSACTION;
384     }
385
386     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
387                                                         const std::string& resourceUri,
388                                                         const QueryParamsMap& queryParams,
389                                                         const HeaderOptions& headerOptions,
390                                                         MQTopicCallback& callback,
391                                                         QualityOfService QoS)
392     {
393         oclog() << "ListenForMQTopic()" << std::flush;
394
395         if (!callback)
396         {
397             return OC_STACK_INVALID_PARAM;
398         }
399
400         ClientCallbackContext::MQTopicContext* context =
401             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
402         OCCallbackData cbdata;
403         cbdata.context = static_cast<void*>(context),
404         cbdata.cb      = listenMQCallback;
405         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
406
407         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
408
409         OCStackResult result = OC_STACK_ERROR;
410         auto cLock = m_csdkLock.lock();
411         if (cLock)
412         {
413             std::lock_guard<std::recursive_mutex> lock(*cLock);
414             OCHeaderOption options[MAX_HEADER_OPTIONS];
415             result = OCDoResource(
416                                   nullptr, OC_REST_GET,
417                                   uri.c_str(),
418                                   &devAddr, nullptr,
419                                   CT_DEFAULT,
420                                   static_cast<OCQualityOfService>(QoS),
421                                   &cbdata,
422                                   assembleHeaderOptions(options, headerOptions),
423                                   headerOptions.size());
424         }
425         else
426         {
427             delete context;
428         }
429
430         return result;
431     }
432 #endif
433
434     OCStackApplicationResult listenDeviceCallback(void* ctx,
435                                                   OCDoHandle /*handle*/,
436             OCClientResponse* clientResponse)
437     {
438         ClientCallbackContext::DeviceListenContext* context =
439             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
440
441         try
442         {
443             OCRepresentation rep = parseGetSetCallback(clientResponse);
444             std::thread exec(context->callback, rep);
445             exec.detach();
446         }
447         catch(OC::OCException& e)
448         {
449             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
450                 <<e.what() <<std::flush;
451         }
452
453         return OC_STACK_KEEP_TRANSACTION;
454     }
455
456     OCStackResult InProcClientWrapper::ListenForDevice(
457             const std::string& serviceUrl,
458             const std::string& deviceURI,
459             OCConnectivityType connectivityType,
460             FindDeviceCallback& callback,
461             QualityOfService QoS)
462     {
463         if (!callback)
464         {
465             return OC_STACK_INVALID_PARAM;
466         }
467         OCStackResult result;
468         ostringstream deviceUri;
469         deviceUri << serviceUrl << deviceURI;
470
471         ClientCallbackContext::DeviceListenContext* context =
472             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
473         OCCallbackData cbdata;
474
475         cbdata.context = static_cast<void*>(context),
476         cbdata.cb      = listenDeviceCallback;
477         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
478
479         auto cLock = m_csdkLock.lock();
480         if (cLock)
481         {
482             std::lock_guard<std::recursive_mutex> lock(*cLock);
483             result = OCDoResource(nullptr, OC_REST_DISCOVER,
484                                   deviceUri.str().c_str(),
485                                   nullptr, nullptr, connectivityType,
486                                   static_cast<OCQualityOfService>(QoS),
487                                   &cbdata,
488                                   nullptr, 0);
489         }
490         else
491         {
492             delete context;
493             result = OC_STACK_ERROR;
494         }
495         return result;
496     }
497
498     void parseServerHeaderOptions(OCClientResponse* clientResponse,
499                     HeaderOptions& serverHeaderOptions)
500     {
501         if (clientResponse)
502         {
503             // Parse header options from server
504             uint16_t optionID;
505             std::string optionData;
506
507             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
508             {
509                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
510                 optionData = reinterpret_cast<const char*>
511                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
512                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
513                 serverHeaderOptions.push_back(headerOption);
514             }
515         }
516         else
517         {
518             // clientResponse is invalid
519             // TODO check proper logging
520             std::cout << " Invalid response " << std::endl;
521         }
522     }
523
524 #ifdef WITH_MQ
525     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
526                     OCClientResponse* clientResponse)
527     {
528         ClientCallbackContext::MQTopicContext* context =
529             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
530         HeaderOptions serverHeaderOptions;
531
532         if (!clientResponse || !context)
533         {
534             return OC_STACK_DELETE_TRANSACTION;
535         }
536
537         std::string createdUri;
538         bool isLocationOption = false;
539         OCStackResult result = clientResponse->result;
540         if (OC_STACK_OK               == result ||
541             OC_STACK_RESOURCE_CREATED == result)
542         {
543             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
544
545             for (auto headerOption : serverHeaderOptions)
546             {
547                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
548                 {
549                     createdUri += "/";
550                     createdUri += headerOption.getOptionData();
551                     if (!isLocationOption)
552                     {
553                         isLocationOption = true;
554                     }
555                 }
556             }
557         }
558
559         if (!isLocationOption)
560         {
561             createdUri = std::string(clientResponse->resourceUri);
562         }
563
564         auto clientWrapper = context->clientWrapper.lock();
565
566         if (!clientWrapper)
567         {
568             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
569                     << std::flush;
570             return OC_STACK_DELETE_TRANSACTION;
571         }
572
573         try
574         {
575             if (OC_STACK_OK               == result ||
576                 OC_STACK_RESOURCE_CREATED == result)
577             {
578                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
579                                             createdUri);
580                 for (auto resource : container.Resources())
581                 {
582                     std::thread exec(context->callback, result,
583                                      createdUri,
584                                      resource);
585                     exec.detach();
586                 }
587             }
588             else
589             {
590                 std::thread exec(context->callback, result,
591                                  createdUri,
592                                  nullptr);
593                 exec.detach();
594             }
595         }
596         catch (std::exception &e)
597         {
598             oclog() << "Exception in createMQTopicCallback, ignoring response: "
599                     << e.what() << std::flush;
600         }
601         return OC_STACK_DELETE_TRANSACTION;
602     }
603
604     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
605                 const OCDevAddr& devAddr,
606                 const std::string& uri,
607                 const OCRepresentation& rep,
608                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
609                 MQTopicCallback& callback, QualityOfService QoS)
610     {
611         if (!callback)
612         {
613             return OC_STACK_INVALID_PARAM;
614         }
615         OCStackResult result;
616         ClientCallbackContext::MQTopicContext* ctx =
617                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
618         OCCallbackData cbdata;
619         cbdata.context = static_cast<void*>(ctx),
620         cbdata.cb      = createMQTopicCallback;
621         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
622
623         std::string url = assembleSetResourceUri(uri, queryParams);
624
625         auto cLock = m_csdkLock.lock();
626
627         if (cLock)
628         {
629             std::lock_guard<std::recursive_mutex> lock(*cLock);
630             OCHeaderOption options[MAX_HEADER_OPTIONS];
631
632             result = OCDoResource(nullptr, OC_REST_PUT,
633                                   url.c_str(), &devAddr,
634                                   assembleSetResourcePayload(rep),
635                                   CT_DEFAULT,
636                                   static_cast<OCQualityOfService>(QoS),
637                                   &cbdata,
638                                   assembleHeaderOptions(options, headerOptions),
639                                   headerOptions.size());
640         }
641         else
642         {
643             delete ctx;
644             result = OC_STACK_ERROR;
645         }
646
647         return result;
648     }
649 #endif
650     OCStackApplicationResult getResourceCallback(void* ctx,
651                                                  OCDoHandle /*handle*/,
652         OCClientResponse* clientResponse)
653     {
654         ClientCallbackContext::GetContext* context =
655             static_cast<ClientCallbackContext::GetContext*>(ctx);
656
657         OCRepresentation rep;
658         HeaderOptions serverHeaderOptions;
659         OCStackResult result = clientResponse->result;
660
661         parseServerHeaderOptions(clientResponse, serverHeaderOptions);
662         try
663         {
664             rep = parseGetSetCallback(clientResponse);
665         }
666         catch(OC::OCException& e)
667         {
668             result = e.code();
669         }
670
671         std::thread exec(context->callback, serverHeaderOptions, rep, result);
672         exec.detach();
673         return OC_STACK_DELETE_TRANSACTION;
674     }
675
676     OCStackResult InProcClientWrapper::GetResourceRepresentation(
677         const OCDevAddr& devAddr,
678         const std::string& resourceUri,
679         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
680         OCConnectivityType connectivityType,
681         GetCallback& callback, QualityOfService QoS)
682     {
683         if (!callback)
684         {
685             return OC_STACK_INVALID_PARAM;
686         }
687         OCStackResult result;
688         ClientCallbackContext::GetContext* ctx =
689             new ClientCallbackContext::GetContext(callback);
690         OCCallbackData cbdata;
691         cbdata.context = static_cast<void*>(ctx),
692         cbdata.cb      = getResourceCallback;
693         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
694
695
696         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
697
698         auto cLock = m_csdkLock.lock();
699
700         if (cLock)
701         {
702             std::lock_guard<std::recursive_mutex> lock(*cLock);
703             OCHeaderOption options[MAX_HEADER_OPTIONS];
704
705             result = OCDoResource(
706                                   nullptr, OC_REST_GET,
707                                   uri.c_str(),
708                                   &devAddr, nullptr,
709                                   connectivityType,
710                                   static_cast<OCQualityOfService>(QoS),
711                                   &cbdata,
712                                   assembleHeaderOptions(options, headerOptions),
713                                   headerOptions.size());
714         }
715         else
716         {
717             delete ctx;
718             result = OC_STACK_ERROR;
719         }
720         return result;
721     }
722
723
724     OCStackApplicationResult setResourceCallback(void* ctx,
725                                                  OCDoHandle /*handle*/,
726         OCClientResponse* clientResponse)
727     {
728         ClientCallbackContext::SetContext* context =
729             static_cast<ClientCallbackContext::SetContext*>(ctx);
730         OCRepresentation attrs;
731         HeaderOptions serverHeaderOptions;
732
733         OCStackResult result = clientResponse->result;
734
735         parseServerHeaderOptions(clientResponse, serverHeaderOptions);
736         try
737         {
738             attrs = parseGetSetCallback(clientResponse);
739         }
740         catch(OC::OCException& e)
741         {
742             result = e.code();
743         }
744
745         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
746         exec.detach();
747         return OC_STACK_DELETE_TRANSACTION;
748     }
749
750     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
751         const QueryParamsMap& queryParams)
752     {
753         if (!uri.empty())
754         {
755             if (uri.back() == '/')
756             {
757                 uri.resize(uri.size() - 1);
758             }
759         }
760
761         ostringstream paramsList;
762         if (queryParams.size() > 0)
763         {
764             paramsList << '?';
765         }
766
767         for (auto& param : queryParams)
768         {
769             paramsList << param.first <<'='<<param.second<<';';
770         }
771
772         std::string queryString = paramsList.str();
773
774         if (queryString.empty())
775         {
776             return uri;
777         }
778
779         if (queryString.back() == ';')
780         {
781             queryString.resize(queryString.size() - 1);
782         }
783
784         std::string ret = uri + queryString;
785         return ret;
786     }
787
788     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
789         const QueryParamsList& queryParams)
790     {
791         if (!uri.empty())
792         {
793             if (uri.back() == '/')
794             {
795                 uri.resize(uri.size() - 1);
796             }
797         }
798
799         ostringstream paramsList;
800         if (queryParams.size() > 0)
801         {
802             paramsList << '?';
803         }
804
805         for (auto& param : queryParams)
806         {
807             for (auto& paramList : param.second)
808             {
809                 paramsList << param.first << '=' << paramList << ';';
810             }
811         }
812
813         std::string queryString = paramsList.str();
814
815         if (queryString.empty())
816         {
817             return uri;
818         }
819
820         if (queryString.back() == ';')
821         {
822             queryString.resize(queryString.size() - 1);
823         }
824
825         std::string ret = uri + queryString;
826         return ret;
827     }
828
829     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
830     {
831         MessageContainer ocInfo;
832         ocInfo.addRepresentation(rep);
833         for(const OCRepresentation& r : rep.getChildren())
834         {
835             ocInfo.addRepresentation(r);
836         }
837
838         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
839     }
840
841     OCStackResult InProcClientWrapper::PostResourceRepresentation(
842         const OCDevAddr& devAddr,
843         const std::string& uri,
844         const OCRepresentation& rep,
845         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
846         OCConnectivityType connectivityType,
847         PostCallback& callback, QualityOfService QoS)
848     {
849         if (!callback)
850         {
851             return OC_STACK_INVALID_PARAM;
852         }
853         OCStackResult result;
854         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
855         OCCallbackData cbdata;
856         cbdata.context = static_cast<void*>(ctx),
857         cbdata.cb      = setResourceCallback;
858         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
859
860
861         std::string url = assembleSetResourceUri(uri, queryParams);
862
863         auto cLock = m_csdkLock.lock();
864
865         if (cLock)
866         {
867             std::lock_guard<std::recursive_mutex> lock(*cLock);
868             OCHeaderOption options[MAX_HEADER_OPTIONS];
869
870             result = OCDoResource(nullptr, OC_REST_POST,
871                                   url.c_str(), &devAddr,
872                                   assembleSetResourcePayload(rep),
873                                   connectivityType,
874                                   static_cast<OCQualityOfService>(QoS),
875                                   &cbdata,
876                                   assembleHeaderOptions(options, headerOptions),
877                                   headerOptions.size());
878         }
879         else
880         {
881             delete ctx;
882             result = OC_STACK_ERROR;
883         }
884
885         return result;
886     }
887
888     OCStackResult InProcClientWrapper::PutResourceRepresentation(
889         const OCDevAddr& devAddr,
890         const std::string& uri,
891         const OCRepresentation& rep,
892         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
893         PutCallback& callback, QualityOfService QoS)
894     {
895         if (!callback)
896         {
897             return OC_STACK_INVALID_PARAM;
898         }
899         OCStackResult result;
900         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
901         OCCallbackData cbdata;
902         cbdata.context = static_cast<void*>(ctx),
903         cbdata.cb      = setResourceCallback;
904         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
905
906
907         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
908
909         auto cLock = m_csdkLock.lock();
910
911         if (cLock)
912         {
913             std::lock_guard<std::recursive_mutex> lock(*cLock);
914             OCDoHandle handle;
915             OCHeaderOption options[MAX_HEADER_OPTIONS];
916
917             result = OCDoResource(&handle, OC_REST_PUT,
918                                   url.c_str(), &devAddr,
919                                   assembleSetResourcePayload(rep),
920                                   CT_DEFAULT,
921                                   static_cast<OCQualityOfService>(QoS),
922                                   &cbdata,
923                                   assembleHeaderOptions(options, headerOptions),
924                                   headerOptions.size());
925         }
926         else
927         {
928             delete ctx;
929             result = OC_STACK_ERROR;
930         }
931
932         return result;
933     }
934
935     OCStackApplicationResult deleteResourceCallback(void* ctx,
936                                                     OCDoHandle /*handle*/,
937         OCClientResponse* clientResponse)
938     {
939         ClientCallbackContext::DeleteContext* context =
940             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
941         HeaderOptions serverHeaderOptions;
942
943         parseServerHeaderOptions(clientResponse, serverHeaderOptions);
944
945         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
946         exec.detach();
947         return OC_STACK_DELETE_TRANSACTION;
948     }
949
950     OCStackResult InProcClientWrapper::DeleteResource(
951         const OCDevAddr& devAddr,
952         const std::string& uri,
953         const HeaderOptions& headerOptions,
954         OCConnectivityType connectivityType,
955         DeleteCallback& callback,
956         QualityOfService /*QoS*/)
957     {
958         if (!callback)
959         {
960             return OC_STACK_INVALID_PARAM;
961         }
962         OCStackResult result;
963         ClientCallbackContext::DeleteContext* ctx =
964             new ClientCallbackContext::DeleteContext(callback);
965         OCCallbackData cbdata;
966         cbdata.context = static_cast<void*>(ctx),
967         cbdata.cb      = deleteResourceCallback;
968         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
969
970
971         auto cLock = m_csdkLock.lock();
972
973         if (cLock)
974         {
975             OCHeaderOption options[MAX_HEADER_OPTIONS];
976
977             std::lock_guard<std::recursive_mutex> lock(*cLock);
978
979             result = OCDoResource(nullptr, OC_REST_DELETE,
980                                   uri.c_str(), &devAddr,
981                                   nullptr,
982                                   connectivityType,
983                                   static_cast<OCQualityOfService>(m_cfg.QoS),
984                                   &cbdata,
985                                   assembleHeaderOptions(options, headerOptions),
986                                   headerOptions.size());
987         }
988         else
989         {
990             delete ctx;
991             result = OC_STACK_ERROR;
992         }
993
994         return result;
995     }
996
997     OCStackApplicationResult observeResourceCallback(void* ctx,
998                                                      OCDoHandle /*handle*/,
999         OCClientResponse* clientResponse)
1000     {
1001         ClientCallbackContext::ObserveContext* context =
1002             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1003         OCRepresentation attrs;
1004         HeaderOptions serverHeaderOptions;
1005         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1006         OCStackResult result = clientResponse->result;
1007
1008         parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1009         try
1010         {
1011             attrs = parseGetSetCallback(clientResponse);
1012         }
1013         catch(OC::OCException& e)
1014         {
1015             result = e.code();
1016         }
1017
1018         std::thread exec(context->callback, serverHeaderOptions, attrs,
1019                     result, sequenceNumber);
1020         exec.detach();
1021         if (sequenceNumber == MAX_SEQUENCE_NUMBER + 1)
1022         {
1023             return OC_STACK_DELETE_TRANSACTION;
1024         }
1025
1026         return OC_STACK_KEEP_TRANSACTION;
1027     }
1028
1029     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1030         const OCDevAddr& devAddr,
1031         const std::string& uri,
1032         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1033         ObserveCallback& callback, QualityOfService QoS)
1034     {
1035         if (!callback)
1036         {
1037             return OC_STACK_INVALID_PARAM;
1038         }
1039         OCStackResult result;
1040
1041         ClientCallbackContext::ObserveContext* ctx =
1042             new ClientCallbackContext::ObserveContext(callback);
1043         OCCallbackData cbdata;
1044         cbdata.context = static_cast<void*>(ctx),
1045         cbdata.cb      = observeResourceCallback;
1046         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1047
1048
1049         OCMethod method;
1050         if (observeType == ObserveType::Observe)
1051         {
1052             method = OC_REST_OBSERVE;
1053         }
1054         else if (observeType == ObserveType::ObserveAll)
1055         {
1056             method = OC_REST_OBSERVE_ALL;
1057         }
1058         else
1059         {
1060             method = OC_REST_OBSERVE_ALL;
1061         }
1062
1063         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1064
1065         auto cLock = m_csdkLock.lock();
1066
1067         if (cLock)
1068         {
1069             std::lock_guard<std::recursive_mutex> lock(*cLock);
1070             OCHeaderOption options[MAX_HEADER_OPTIONS];
1071
1072             result = OCDoResource(handle, method,
1073                                   url.c_str(), &devAddr,
1074                                   nullptr,
1075                                   CT_DEFAULT,
1076                                   static_cast<OCQualityOfService>(QoS),
1077                                   &cbdata,
1078                                   assembleHeaderOptions(options, headerOptions),
1079                                   headerOptions.size());
1080         }
1081         else
1082         {
1083             delete ctx;
1084             return OC_STACK_ERROR;
1085         }
1086
1087         return result;
1088     }
1089
1090     OCStackResult InProcClientWrapper::CancelObserveResource(
1091             OCDoHandle handle,
1092             const std::string& /*host*/,
1093             const std::string& /*uri*/,
1094             const HeaderOptions& headerOptions,
1095             QualityOfService QoS)
1096     {
1097         OCStackResult result;
1098         auto cLock = m_csdkLock.lock();
1099
1100         if (cLock)
1101         {
1102             std::lock_guard<std::recursive_mutex> lock(*cLock);
1103             OCHeaderOption options[MAX_HEADER_OPTIONS];
1104
1105             result = OCCancel(handle,
1106                     static_cast<OCQualityOfService>(QoS),
1107                     assembleHeaderOptions(options, headerOptions),
1108                     headerOptions.size());
1109         }
1110         else
1111         {
1112             result = OC_STACK_ERROR;
1113         }
1114
1115         return result;
1116     }
1117
1118     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1119                                                        OCDoHandle /*handle*/,
1120             OCClientResponse* clientResponse)
1121     {
1122         ClientCallbackContext::SubscribePresenceContext* context =
1123         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1124
1125         /*
1126          * This a hack while we rethink presence subscription.
1127          */
1128         std::string url = clientResponse->devAddr.addr;
1129
1130         std::thread exec(context->callback, clientResponse->result,
1131                     clientResponse->sequenceNumber, url);
1132
1133         exec.detach();
1134
1135         return OC_STACK_KEEP_TRANSACTION;
1136     }
1137
1138     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1139         const std::string& host, const std::string& resourceType,
1140         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1141     {
1142         if (!presenceHandler)
1143         {
1144             return OC_STACK_INVALID_PARAM;
1145         }
1146
1147         ClientCallbackContext::SubscribePresenceContext* ctx =
1148             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1149         OCCallbackData cbdata;
1150         cbdata.context = static_cast<void*>(ctx),
1151         cbdata.cb      = subscribePresenceCallback;
1152         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1153
1154
1155         auto cLock = m_csdkLock.lock();
1156
1157         std::ostringstream os;
1158         os << host << OC_RSRVD_PRESENCE_URI;
1159
1160         if (!resourceType.empty())
1161         {
1162             os << "?rt=" << resourceType;
1163         }
1164
1165         if (!cLock)
1166         {
1167             delete ctx;
1168             return OC_STACK_ERROR;
1169         }
1170
1171         return OCDoResource(handle, OC_REST_PRESENCE,
1172                             os.str().c_str(), nullptr,
1173                             nullptr, connectivityType,
1174                             OC_LOW_QOS, &cbdata, NULL, 0);
1175     }
1176
1177     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1178     {
1179         OCStackResult result;
1180         auto cLock = m_csdkLock.lock();
1181
1182         if (cLock)
1183         {
1184             std::lock_guard<std::recursive_mutex> lock(*cLock);
1185             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1186         }
1187         else
1188         {
1189             result = OC_STACK_ERROR;
1190         }
1191
1192         return result;
1193     }
1194
1195 #ifdef WITH_CLOUD
1196     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1197                                                                const std::string& host,
1198                                                                const std::vector<std::string>& di,
1199                                                                OCConnectivityType connectivityType,
1200                                                                ObserveCallback& callback)
1201     {
1202         if (!callback)
1203         {
1204             return OC_STACK_INVALID_PARAM;
1205         }
1206         OCStackResult result;
1207
1208         ClientCallbackContext::ObserveContext* ctx =
1209             new ClientCallbackContext::ObserveContext(callback);
1210         OCCallbackData cbdata;
1211         cbdata.context = static_cast<void*>(ctx),
1212         cbdata.cb      = observeResourceCallback;
1213         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1214
1215         auto cLock = m_csdkLock.lock();
1216
1217         if (cLock)
1218         {
1219             std::lock_guard<std::recursive_mutex> lock(*cLock);
1220
1221             std::ostringstream os;
1222             os << host << OC_RSRVD_DEVICE_PRESENCE_URI;
1223             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1224             std::string url = assembleSetResourceUri(os.str(), queryParams);
1225
1226             result = OCDoResource(handle, OC_REST_OBSERVE,
1227                                   url.c_str(), nullptr,
1228                                   nullptr, connectivityType,
1229                                   OC_LOW_QOS, &cbdata,
1230                                   nullptr, 0);
1231         }
1232         else
1233         {
1234             delete ctx;
1235             result = OC_STACK_ERROR;
1236         }
1237
1238         return result;
1239     }
1240 #endif
1241
1242     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1243     {
1244         qos = m_cfg.QoS;
1245         return OC_STACK_OK;
1246     }
1247
1248     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1249            const HeaderOptions& headerOptions)
1250     {
1251         int i = 0;
1252
1253         if ( headerOptions.size() == 0)
1254         {
1255             return nullptr;
1256         }
1257
1258         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1259         {
1260             options[i] = OCHeaderOption();
1261             options[i].protocolID = OC_COAP_ID;
1262             options[i].optionID = it->getOptionID();
1263             options[i].optionLength = it->getOptionData().length() + 1;
1264             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1265             i++;
1266         }
1267
1268         return options;
1269     }
1270
1271     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1272     {
1273         if (!dev)
1274         {
1275             return nullptr;
1276         }
1277
1278         OCDPDev_t* result = new OCDPDev_t(*dev);
1279         result->prm = new OCPrm_t[dev->prmLen];
1280         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1281         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1282     }
1283
1284     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1285     {
1286         while(list)
1287         {
1288             dpList.push_back(cloneDevice(list));
1289             list = list->next;
1290         }
1291     }
1292
1293     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1294             GetDirectPairedCallback& callback)
1295     {
1296         if (!callback || 0 == waittime)
1297         {
1298             return OC_STACK_INVALID_PARAM;
1299         }
1300
1301         OCStackResult result = OC_STACK_ERROR;
1302         const OCDPDev_t *list = nullptr;
1303         PairedDevices dpDeviceList;
1304
1305         auto cLock = m_csdkLock.lock();
1306
1307         if (cLock)
1308         {
1309             std::lock_guard<std::recursive_mutex> lock(*cLock);
1310
1311             list = OCDiscoverDirectPairingDevices(waittime);
1312             if (NULL == list)
1313             {
1314                 result = OC_STACK_NO_RESOURCE;
1315                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1316                     << std::flush;
1317             }
1318             else {
1319                 convert(list, dpDeviceList);
1320                 std::thread exec(callback, dpDeviceList);
1321                 exec.detach();
1322                 result = OC_STACK_OK;
1323             }
1324         }
1325         else
1326         {
1327             result = OC_STACK_ERROR;
1328         }
1329
1330         return result;
1331     }
1332
1333     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1334     {
1335         if (!callback)
1336         {
1337             return OC_STACK_INVALID_PARAM;
1338         }
1339
1340         OCStackResult result = OC_STACK_ERROR;
1341         const OCDPDev_t *list = nullptr;
1342         PairedDevices dpDeviceList;
1343
1344         auto cLock = m_csdkLock.lock();
1345
1346         if (cLock)
1347         {
1348             std::lock_guard<std::recursive_mutex> lock(*cLock);
1349
1350             list = OCGetDirectPairedDevices();
1351             if (NULL == list)
1352             {
1353                 result = OC_STACK_NO_RESOURCE;
1354                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1355                     << std::flush;
1356             }
1357             else {
1358                 convert(list, dpDeviceList);
1359                 std::thread exec(callback, dpDeviceList);
1360                 exec.detach();
1361                 result = OC_STACK_OK;
1362             }
1363         }
1364         else
1365         {
1366             result = OC_STACK_ERROR;
1367         }
1368
1369         return result;
1370     }
1371
1372     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1373             OCStackResult result)
1374     {
1375
1376         ClientCallbackContext::DirectPairingContext* context =
1377             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1378
1379         std::thread exec(context->callback, cloneDevice(peer), result);
1380         exec.detach();
1381     }
1382
1383     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1384             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1385     {
1386         if (!peer || !callback)
1387         {
1388             oclog() << "Invalid parameters" << std::flush;
1389             return OC_STACK_INVALID_PARAM;
1390         }
1391
1392         OCStackResult result = OC_STACK_ERROR;
1393         ClientCallbackContext::DirectPairingContext* context =
1394             new ClientCallbackContext::DirectPairingContext(callback);
1395
1396         auto cLock = m_csdkLock.lock();
1397         if (cLock)
1398         {
1399             std::lock_guard<std::recursive_mutex> lock(*cLock);
1400             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1401                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1402         }
1403         else
1404         {
1405             delete context;
1406             result = OC_STACK_ERROR;
1407         }
1408         return result;
1409     }
1410 }