Merge branch 'master' into notification-service
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try
169         {
170             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
171                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
172             // loop to ensure valid construction of all resources
173
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179         }
180         catch (std::exception &e)
181         {
182             oclog() << "Exception in listCallback, ignoring response: "
183                     << e.what() << std::flush;
184         }
185
186
187         return OC_STACK_KEEP_TRANSACTION;
188     }
189
190     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
191         OCClientResponse* clientResponse)
192     {
193         if (!ctx || !clientResponse)
194         {
195             return OC_STACK_KEEP_TRANSACTION;
196         }
197
198         ClientCallbackContext::ListenErrorContext* context =
199             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
200         if (!context)
201         {
202             return OC_STACK_KEEP_TRANSACTION;
203         }
204
205         OCStackResult result = clientResponse->result;
206         if (result == OC_STACK_OK)
207         {
208             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
209             {
210                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
211                     << std::flush;
212                 return OC_STACK_KEEP_TRANSACTION;
213             }
214
215             auto clientWrapper = context->clientWrapper.lock();
216
217             if (!clientWrapper)
218             {
219                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
220                         << std::flush;
221                 return OC_STACK_KEEP_TRANSACTION;
222             }
223
224             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
225                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
226             // loop to ensure valid construction of all resources
227             for (auto resource : container.Resources())
228             {
229                 std::thread exec(context->callback, resource);
230                 exec.detach();
231             }
232             return OC_STACK_KEEP_TRANSACTION;
233         }
234
235         std::string resourceURI = clientResponse->resourceUri;
236         std::thread exec(context->errorCallback, resourceURI, result);
237         exec.detach();
238         return OC_STACK_DELETE_TRANSACTION;
239     }
240
241     OCStackResult InProcClientWrapper::ListenForResource(
242             const std::string& serviceUrl,
243             const std::string& resourceType,
244             OCConnectivityType connectivityType,
245             FindCallback& callback, QualityOfService QoS)
246     {
247         if (!callback)
248         {
249             return OC_STACK_INVALID_PARAM;
250         }
251
252         OCStackResult result;
253         ostringstream resourceUri;
254         resourceUri << serviceUrl << resourceType;
255
256         ClientCallbackContext::ListenContext* context =
257             new ClientCallbackContext::ListenContext(callback, shared_from_this());
258         OCCallbackData cbdata;
259         cbdata.context = static_cast<void*>(context),
260         cbdata.cb      = listenCallback;
261         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
262
263         auto cLock = m_csdkLock.lock();
264         if (cLock)
265         {
266             std::lock_guard<std::recursive_mutex> lock(*cLock);
267             result = OCDoResource(nullptr, OC_REST_DISCOVER,
268                                   resourceUri.str().c_str(),
269                                   nullptr, nullptr, connectivityType,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   nullptr, 0);
273         }
274         else
275         {
276             delete context;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282     OCStackResult InProcClientWrapper::ListenErrorForResource(
283             const std::string& serviceUrl,
284             const std::string& resourceType,
285             OCConnectivityType connectivityType,
286             FindCallback& callback, FindErrorCallback& errorCallback,
287             QualityOfService QoS)
288     {
289         if (!callback)
290         {
291             return OC_STACK_INVALID_PARAM;
292         }
293
294         ostringstream resourceUri;
295         resourceUri << serviceUrl << resourceType;
296
297         ClientCallbackContext::ListenErrorContext* context =
298             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
299                                                           shared_from_this());
300         if (!context)
301         {
302             return OC_STACK_ERROR;
303         }
304
305         OCCallbackData cbdata(
306                 static_cast<void*>(context),
307                 listenErrorCallback,
308                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
309             );
310
311         OCStackResult result;
312         auto cLock = m_csdkLock.lock();
313         if (cLock)
314         {
315             std::lock_guard<std::recursive_mutex> lock(*cLock);
316             result = OCDoResource(nullptr, OC_REST_DISCOVER,
317                                   resourceUri.str().c_str(),
318                                   nullptr, nullptr, connectivityType,
319                                   static_cast<OCQualityOfService>(QoS),
320                                   &cbdata,
321                                   nullptr, 0);
322         }
323         else
324         {
325             delete context;
326             result = OC_STACK_ERROR;
327         }
328         return result;
329     }
330 #ifdef WITH_MQ
331     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
332                                               OCClientResponse* clientResponse)
333     {
334         ClientCallbackContext::MQTopicContext* context =
335             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
336
337         if (!clientResponse || !context)
338         {
339             return OC_STACK_DELETE_TRANSACTION;
340         }
341
342         if (clientResponse->result != OC_STACK_OK)
343         {
344             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
345                     << clientResponse->result
346                     << std::flush;
347
348             std::thread exec(context->callback, clientResponse->result,
349                              std::string(clientResponse->resourceUri), nullptr);
350             exec.detach();
351
352             return OC_STACK_DELETE_TRANSACTION;
353         }
354
355         auto clientWrapper = context->clientWrapper.lock();
356         if (!clientWrapper)
357         {
358             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
359                     << std::flush;
360             return OC_STACK_DELETE_TRANSACTION;
361         }
362
363         try
364         {
365             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
366                                         (OCRepPayload *) clientResponse->payload);
367
368             // loop to ensure valid construction of all resources
369             for (auto resource : container.Resources())
370             {
371                 std::thread exec(context->callback, clientResponse->result,
372                                  std::string(clientResponse->resourceUri), resource);
373                 exec.detach();
374             }
375         }
376         catch (std::exception &e)
377         {
378             oclog() << "Exception in listCallback, ignoring response: "
379                     << e.what() << std::flush;
380         }
381
382         return OC_STACK_DELETE_TRANSACTION;
383     }
384
385     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
386                                                         const std::string& resourceUri,
387                                                         const QueryParamsMap& queryParams,
388                                                         const HeaderOptions& headerOptions,
389                                                         MQTopicCallback& callback,
390                                                         QualityOfService QoS)
391     {
392         oclog() << "ListenForMQTopic()" << std::flush;
393
394         if (!callback)
395         {
396             return OC_STACK_INVALID_PARAM;
397         }
398
399         ClientCallbackContext::MQTopicContext* context =
400             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
401         OCCallbackData cbdata;
402         cbdata.context = static_cast<void*>(context),
403         cbdata.cb      = listenMQCallback;
404         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
405
406         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
407
408         OCStackResult result = OC_STACK_ERROR;
409         auto cLock = m_csdkLock.lock();
410         if (cLock)
411         {
412             std::lock_guard<std::recursive_mutex> lock(*cLock);
413             OCHeaderOption options[MAX_HEADER_OPTIONS];
414             result = OCDoResource(
415                                   nullptr, OC_REST_GET,
416                                   uri.c_str(),
417                                   &devAddr, nullptr,
418                                   CT_DEFAULT,
419                                   static_cast<OCQualityOfService>(QoS),
420                                   &cbdata,
421                                   assembleHeaderOptions(options, headerOptions),
422                                   headerOptions.size());
423         }
424         else
425         {
426             delete context;
427         }
428
429         return result;
430     }
431 #endif
432
433     OCStackApplicationResult listenDeviceCallback(void* ctx,
434                                                   OCDoHandle /*handle*/,
435             OCClientResponse* clientResponse)
436     {
437         ClientCallbackContext::DeviceListenContext* context =
438             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
439
440         try
441         {
442             OCRepresentation rep = parseGetSetCallback(clientResponse);
443             std::thread exec(context->callback, rep);
444             exec.detach();
445         }
446         catch(OC::OCException& e)
447         {
448             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
449                 <<e.what() <<std::flush;
450         }
451
452         return OC_STACK_KEEP_TRANSACTION;
453     }
454
455     OCStackResult InProcClientWrapper::ListenForDevice(
456             const std::string& serviceUrl,
457             const std::string& deviceURI,
458             OCConnectivityType connectivityType,
459             FindDeviceCallback& callback,
460             QualityOfService QoS)
461     {
462         if (!callback)
463         {
464             return OC_STACK_INVALID_PARAM;
465         }
466         OCStackResult result;
467         ostringstream deviceUri;
468         deviceUri << serviceUrl << deviceURI;
469
470         ClientCallbackContext::DeviceListenContext* context =
471             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
472         OCCallbackData cbdata;
473
474         cbdata.context = static_cast<void*>(context),
475         cbdata.cb      = listenDeviceCallback;
476         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
477
478         auto cLock = m_csdkLock.lock();
479         if (cLock)
480         {
481             std::lock_guard<std::recursive_mutex> lock(*cLock);
482             result = OCDoResource(nullptr, OC_REST_DISCOVER,
483                                   deviceUri.str().c_str(),
484                                   nullptr, nullptr, connectivityType,
485                                   static_cast<OCQualityOfService>(QoS),
486                                   &cbdata,
487                                   nullptr, 0);
488         }
489         else
490         {
491             delete context;
492             result = OC_STACK_ERROR;
493         }
494         return result;
495     }
496
497     void parseServerHeaderOptions(OCClientResponse* clientResponse,
498                     HeaderOptions& serverHeaderOptions)
499     {
500         if (clientResponse)
501         {
502             // Parse header options from server
503             uint16_t optionID;
504             std::string optionData;
505
506             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
507             {
508                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
509                 optionData = reinterpret_cast<const char*>
510                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
511                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
512                 serverHeaderOptions.push_back(headerOption);
513             }
514         }
515         else
516         {
517             // clientResponse is invalid
518             // TODO check proper logging
519             std::cout << " Invalid response " << std::endl;
520         }
521     }
522
523 #ifdef WITH_MQ
524     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
525                     OCClientResponse* clientResponse)
526     {
527         ClientCallbackContext::MQTopicContext* context =
528             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
529         HeaderOptions serverHeaderOptions;
530
531         if (!clientResponse || !context)
532         {
533             return OC_STACK_DELETE_TRANSACTION;
534         }
535
536         std::string createdUri;
537         bool isLocationOption = false;
538         OCStackResult result = clientResponse->result;
539         if (OC_STACK_OK               == result ||
540             OC_STACK_RESOURCE_CREATED == result)
541         {
542             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
543
544             for (auto headerOption : serverHeaderOptions)
545             {
546                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
547                 {
548                     createdUri += "/";
549                     createdUri += headerOption.getOptionData();
550                     if (!isLocationOption)
551                     {
552                         isLocationOption = true;
553                     }
554                 }
555             }
556         }
557
558         if (!isLocationOption)
559         {
560             createdUri = std::string(clientResponse->resourceUri);
561         }
562
563         auto clientWrapper = context->clientWrapper.lock();
564
565         if (!clientWrapper)
566         {
567             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
568                     << std::flush;
569             return OC_STACK_DELETE_TRANSACTION;
570         }
571
572         try
573         {
574             if (OC_STACK_OK               == result ||
575                 OC_STACK_RESOURCE_CREATED == result)
576             {
577                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
578                                             createdUri);
579                 for (auto resource : container.Resources())
580                 {
581                     std::thread exec(context->callback, result,
582                                      createdUri,
583                                      resource);
584                     exec.detach();
585                 }
586             }
587             else
588             {
589                 std::thread exec(context->callback, result,
590                                  createdUri,
591                                  nullptr);
592                 exec.detach();
593             }
594         }
595         catch (std::exception &e)
596         {
597             oclog() << "Exception in createMQTopicCallback, ignoring response: "
598                     << e.what() << std::flush;
599         }
600         return OC_STACK_DELETE_TRANSACTION;
601     }
602
603     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
604                 const OCDevAddr& devAddr,
605                 const std::string& uri,
606                 const OCRepresentation& rep,
607                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
608                 MQTopicCallback& callback, QualityOfService QoS)
609     {
610         if (!callback)
611         {
612             return OC_STACK_INVALID_PARAM;
613         }
614         OCStackResult result;
615         ClientCallbackContext::MQTopicContext* ctx =
616                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
617         OCCallbackData cbdata;
618         cbdata.context = static_cast<void*>(ctx),
619         cbdata.cb      = createMQTopicCallback;
620         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
621
622         std::string url = assembleSetResourceUri(uri, queryParams);
623
624         auto cLock = m_csdkLock.lock();
625
626         if (cLock)
627         {
628             std::lock_guard<std::recursive_mutex> lock(*cLock);
629             OCHeaderOption options[MAX_HEADER_OPTIONS];
630
631             result = OCDoResource(nullptr, OC_REST_PUT,
632                                   url.c_str(), &devAddr,
633                                   assembleSetResourcePayload(rep),
634                                   CT_DEFAULT,
635                                   static_cast<OCQualityOfService>(QoS),
636                                   &cbdata,
637                                   assembleHeaderOptions(options, headerOptions),
638                                   headerOptions.size());
639         }
640         else
641         {
642             delete ctx;
643             result = OC_STACK_ERROR;
644         }
645
646         return result;
647     }
648 #endif
649     OCStackApplicationResult getResourceCallback(void* ctx,
650                                                  OCDoHandle /*handle*/,
651         OCClientResponse* clientResponse)
652     {
653         ClientCallbackContext::GetContext* context =
654             static_cast<ClientCallbackContext::GetContext*>(ctx);
655
656         OCRepresentation rep;
657         HeaderOptions serverHeaderOptions;
658         OCStackResult result = clientResponse->result;
659         if (result == OC_STACK_OK)
660         {
661             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
662             try
663             {
664                 rep = parseGetSetCallback(clientResponse);
665             }
666             catch(OC::OCException& e)
667             {
668                 result = e.code();
669             }
670         }
671
672         std::thread exec(context->callback, serverHeaderOptions, rep, result);
673         exec.detach();
674         return OC_STACK_DELETE_TRANSACTION;
675     }
676
677     OCStackResult InProcClientWrapper::GetResourceRepresentation(
678         const OCDevAddr& devAddr,
679         const std::string& resourceUri,
680         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
681         OCConnectivityType connectivityType,
682         GetCallback& callback, QualityOfService QoS)
683     {
684         if (!callback)
685         {
686             return OC_STACK_INVALID_PARAM;
687         }
688         OCStackResult result;
689         ClientCallbackContext::GetContext* ctx =
690             new ClientCallbackContext::GetContext(callback);
691         OCCallbackData cbdata;
692         cbdata.context = static_cast<void*>(ctx),
693         cbdata.cb      = getResourceCallback;
694         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
695
696
697         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
698
699         auto cLock = m_csdkLock.lock();
700
701         if (cLock)
702         {
703             std::lock_guard<std::recursive_mutex> lock(*cLock);
704             OCHeaderOption options[MAX_HEADER_OPTIONS];
705
706             result = OCDoResource(
707                                   nullptr, OC_REST_GET,
708                                   uri.c_str(),
709                                   &devAddr, nullptr,
710                                   connectivityType,
711                                   static_cast<OCQualityOfService>(QoS),
712                                   &cbdata,
713                                   assembleHeaderOptions(options, headerOptions),
714                                   headerOptions.size());
715         }
716         else
717         {
718             delete ctx;
719             result = OC_STACK_ERROR;
720         }
721         return result;
722     }
723
724
725     OCStackApplicationResult setResourceCallback(void* ctx,
726                                                  OCDoHandle /*handle*/,
727         OCClientResponse* clientResponse)
728     {
729         ClientCallbackContext::SetContext* context =
730             static_cast<ClientCallbackContext::SetContext*>(ctx);
731         OCRepresentation attrs;
732         HeaderOptions serverHeaderOptions;
733
734         OCStackResult result = clientResponse->result;
735         if (OC_STACK_OK               == result ||
736             OC_STACK_RESOURCE_CREATED == result ||
737             OC_STACK_RESOURCE_DELETED == result ||
738             OC_STACK_RESOURCE_CHANGED == result)
739         {
740             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
741             try
742             {
743                 attrs = parseGetSetCallback(clientResponse);
744             }
745             catch(OC::OCException& e)
746             {
747                 result = e.code();
748             }
749         }
750
751         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
752         exec.detach();
753         return OC_STACK_DELETE_TRANSACTION;
754     }
755
756     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
757         const QueryParamsMap& queryParams)
758     {
759         if (!uri.empty())
760         {
761             if (uri.back() == '/')
762             {
763                 uri.resize(uri.size() - 1);
764             }
765         }
766
767         ostringstream paramsList;
768         if (queryParams.size() > 0)
769         {
770             paramsList << '?';
771         }
772
773         for (auto& param : queryParams)
774         {
775             paramsList << param.first <<'='<<param.second<<';';
776         }
777
778         std::string queryString = paramsList.str();
779
780         if (queryString.empty())
781         {
782             return uri;
783         }
784
785         if (queryString.back() == ';')
786         {
787             queryString.resize(queryString.size() - 1);
788         }
789
790         std::string ret = uri + queryString;
791         return ret;
792     }
793
794     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
795         const QueryParamsList& queryParams)
796     {
797         if (!uri.empty())
798         {
799             if (uri.back() == '/')
800             {
801                 uri.resize(uri.size() - 1);
802             }
803         }
804
805         ostringstream paramsList;
806         if (queryParams.size() > 0)
807         {
808             paramsList << '?';
809         }
810
811         for (auto& param : queryParams)
812         {
813             for (auto& paramList : param.second)
814             {
815                 paramsList << param.first << '=' << paramList << ';';
816             }
817         }
818
819         std::string queryString = paramsList.str();
820
821         if (queryString.empty())
822         {
823             return uri;
824         }
825
826         if (queryString.back() == ';')
827         {
828             queryString.resize(queryString.size() - 1);
829         }
830
831         std::string ret = uri + queryString;
832         return ret;
833     }
834
835     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
836     {
837         MessageContainer ocInfo;
838         ocInfo.addRepresentation(rep);
839         for(const OCRepresentation& r : rep.getChildren())
840         {
841             ocInfo.addRepresentation(r);
842         }
843
844         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
845     }
846
847     OCStackResult InProcClientWrapper::PostResourceRepresentation(
848         const OCDevAddr& devAddr,
849         const std::string& uri,
850         const OCRepresentation& rep,
851         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
852         OCConnectivityType connectivityType,
853         PostCallback& callback, QualityOfService QoS)
854     {
855         if (!callback)
856         {
857             return OC_STACK_INVALID_PARAM;
858         }
859         OCStackResult result;
860         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
861         OCCallbackData cbdata;
862         cbdata.context = static_cast<void*>(ctx),
863         cbdata.cb      = setResourceCallback;
864         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
865
866
867         std::string url = assembleSetResourceUri(uri, queryParams);
868
869         auto cLock = m_csdkLock.lock();
870
871         if (cLock)
872         {
873             std::lock_guard<std::recursive_mutex> lock(*cLock);
874             OCHeaderOption options[MAX_HEADER_OPTIONS];
875
876             result = OCDoResource(nullptr, OC_REST_POST,
877                                   url.c_str(), &devAddr,
878                                   assembleSetResourcePayload(rep),
879                                   connectivityType,
880                                   static_cast<OCQualityOfService>(QoS),
881                                   &cbdata,
882                                   assembleHeaderOptions(options, headerOptions),
883                                   headerOptions.size());
884         }
885         else
886         {
887             delete ctx;
888             result = OC_STACK_ERROR;
889         }
890
891         return result;
892     }
893
894     OCStackResult InProcClientWrapper::PutResourceRepresentation(
895         const OCDevAddr& devAddr,
896         const std::string& uri,
897         const OCRepresentation& rep,
898         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
899         PutCallback& callback, QualityOfService QoS)
900     {
901         if (!callback)
902         {
903             return OC_STACK_INVALID_PARAM;
904         }
905         OCStackResult result;
906         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
907         OCCallbackData cbdata;
908         cbdata.context = static_cast<void*>(ctx),
909         cbdata.cb      = setResourceCallback;
910         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
911
912
913         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
914
915         auto cLock = m_csdkLock.lock();
916
917         if (cLock)
918         {
919             std::lock_guard<std::recursive_mutex> lock(*cLock);
920             OCDoHandle handle;
921             OCHeaderOption options[MAX_HEADER_OPTIONS];
922
923             result = OCDoResource(&handle, OC_REST_PUT,
924                                   url.c_str(), &devAddr,
925                                   assembleSetResourcePayload(rep),
926                                   CT_DEFAULT,
927                                   static_cast<OCQualityOfService>(QoS),
928                                   &cbdata,
929                                   assembleHeaderOptions(options, headerOptions),
930                                   headerOptions.size());
931         }
932         else
933         {
934             delete ctx;
935             result = OC_STACK_ERROR;
936         }
937
938         return result;
939     }
940
941     OCStackApplicationResult deleteResourceCallback(void* ctx,
942                                                     OCDoHandle /*handle*/,
943         OCClientResponse* clientResponse)
944     {
945         ClientCallbackContext::DeleteContext* context =
946             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
947         HeaderOptions serverHeaderOptions;
948
949         if (clientResponse->result == OC_STACK_OK)
950         {
951             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
952         }
953         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
954         exec.detach();
955         return OC_STACK_DELETE_TRANSACTION;
956     }
957
958     OCStackResult InProcClientWrapper::DeleteResource(
959         const OCDevAddr& devAddr,
960         const std::string& uri,
961         const HeaderOptions& headerOptions,
962         OCConnectivityType connectivityType,
963         DeleteCallback& callback,
964         QualityOfService /*QoS*/)
965     {
966         if (!callback)
967         {
968             return OC_STACK_INVALID_PARAM;
969         }
970         OCStackResult result;
971         ClientCallbackContext::DeleteContext* ctx =
972             new ClientCallbackContext::DeleteContext(callback);
973         OCCallbackData cbdata;
974         cbdata.context = static_cast<void*>(ctx),
975         cbdata.cb      = deleteResourceCallback;
976         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
977
978
979         auto cLock = m_csdkLock.lock();
980
981         if (cLock)
982         {
983             OCHeaderOption options[MAX_HEADER_OPTIONS];
984
985             std::lock_guard<std::recursive_mutex> lock(*cLock);
986
987             result = OCDoResource(nullptr, OC_REST_DELETE,
988                                   uri.c_str(), &devAddr,
989                                   nullptr,
990                                   connectivityType,
991                                   static_cast<OCQualityOfService>(m_cfg.QoS),
992                                   &cbdata,
993                                   assembleHeaderOptions(options, headerOptions),
994                                   headerOptions.size());
995         }
996         else
997         {
998             delete ctx;
999             result = OC_STACK_ERROR;
1000         }
1001
1002         return result;
1003     }
1004
1005     OCStackApplicationResult observeResourceCallback(void* ctx,
1006                                                      OCDoHandle /*handle*/,
1007         OCClientResponse* clientResponse)
1008     {
1009         ClientCallbackContext::ObserveContext* context =
1010             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1011         OCRepresentation attrs;
1012         HeaderOptions serverHeaderOptions;
1013         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1014         OCStackResult result = clientResponse->result;
1015         if (clientResponse->result == OC_STACK_OK)
1016         {
1017             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1018             try
1019             {
1020                 attrs = parseGetSetCallback(clientResponse);
1021             }
1022             catch(OC::OCException& e)
1023             {
1024                 result = e.code();
1025             }
1026         }
1027         std::thread exec(context->callback, serverHeaderOptions, attrs,
1028                     result, sequenceNumber);
1029         exec.detach();
1030
1031         return OC_STACK_KEEP_TRANSACTION;
1032     }
1033
1034     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1035         const OCDevAddr& devAddr,
1036         const std::string& uri,
1037         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1038         ObserveCallback& callback, QualityOfService QoS)
1039     {
1040         if (!callback)
1041         {
1042             return OC_STACK_INVALID_PARAM;
1043         }
1044         OCStackResult result;
1045
1046         ClientCallbackContext::ObserveContext* ctx =
1047             new ClientCallbackContext::ObserveContext(callback);
1048         OCCallbackData cbdata;
1049         cbdata.context = static_cast<void*>(ctx),
1050         cbdata.cb      = observeResourceCallback;
1051         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1052
1053
1054         OCMethod method;
1055         if (observeType == ObserveType::Observe)
1056         {
1057             method = OC_REST_OBSERVE;
1058         }
1059         else if (observeType == ObserveType::ObserveAll)
1060         {
1061             method = OC_REST_OBSERVE_ALL;
1062         }
1063         else
1064         {
1065             method = OC_REST_OBSERVE_ALL;
1066         }
1067
1068         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1069
1070         auto cLock = m_csdkLock.lock();
1071
1072         if (cLock)
1073         {
1074             std::lock_guard<std::recursive_mutex> lock(*cLock);
1075             OCHeaderOption options[MAX_HEADER_OPTIONS];
1076
1077             result = OCDoResource(handle, method,
1078                                   url.c_str(), &devAddr,
1079                                   nullptr,
1080                                   CT_DEFAULT,
1081                                   static_cast<OCQualityOfService>(QoS),
1082                                   &cbdata,
1083                                   assembleHeaderOptions(options, headerOptions),
1084                                   headerOptions.size());
1085         }
1086         else
1087         {
1088             delete ctx;
1089             return OC_STACK_ERROR;
1090         }
1091
1092         return result;
1093     }
1094
1095     OCStackResult InProcClientWrapper::CancelObserveResource(
1096             OCDoHandle handle,
1097             const std::string& /*host*/,
1098             const std::string& /*uri*/,
1099             const HeaderOptions& headerOptions,
1100             QualityOfService QoS)
1101     {
1102         OCStackResult result;
1103         auto cLock = m_csdkLock.lock();
1104
1105         if (cLock)
1106         {
1107             std::lock_guard<std::recursive_mutex> lock(*cLock);
1108             OCHeaderOption options[MAX_HEADER_OPTIONS];
1109
1110             result = OCCancel(handle,
1111                     static_cast<OCQualityOfService>(QoS),
1112                     assembleHeaderOptions(options, headerOptions),
1113                     headerOptions.size());
1114         }
1115         else
1116         {
1117             result = OC_STACK_ERROR;
1118         }
1119
1120         return result;
1121     }
1122
1123     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1124                                                        OCDoHandle /*handle*/,
1125             OCClientResponse* clientResponse)
1126     {
1127         ClientCallbackContext::SubscribePresenceContext* context =
1128         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1129
1130         /*
1131          * This a hack while we rethink presence subscription.
1132          */
1133         std::string url = clientResponse->devAddr.addr;
1134
1135         std::thread exec(context->callback, clientResponse->result,
1136                     clientResponse->sequenceNumber, url);
1137
1138         exec.detach();
1139
1140         return OC_STACK_KEEP_TRANSACTION;
1141     }
1142
1143     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1144         const std::string& host, const std::string& resourceType,
1145         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1146     {
1147         if (!presenceHandler)
1148         {
1149             return OC_STACK_INVALID_PARAM;
1150         }
1151
1152         ClientCallbackContext::SubscribePresenceContext* ctx =
1153             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1154         OCCallbackData cbdata;
1155         cbdata.context = static_cast<void*>(ctx),
1156         cbdata.cb      = subscribePresenceCallback;
1157         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1158
1159
1160         auto cLock = m_csdkLock.lock();
1161
1162         std::ostringstream os;
1163         os << host << OC_RSRVD_PRESENCE_URI;
1164
1165         if (!resourceType.empty())
1166         {
1167             os << "?rt=" << resourceType;
1168         }
1169
1170         if (!cLock)
1171         {
1172             delete ctx;
1173             return OC_STACK_ERROR;
1174         }
1175
1176         return OCDoResource(handle, OC_REST_PRESENCE,
1177                             os.str().c_str(), nullptr,
1178                             nullptr, connectivityType,
1179                             OC_LOW_QOS, &cbdata, NULL, 0);
1180     }
1181
1182     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1183     {
1184         OCStackResult result;
1185         auto cLock = m_csdkLock.lock();
1186
1187         if (cLock)
1188         {
1189             std::lock_guard<std::recursive_mutex> lock(*cLock);
1190             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1191         }
1192         else
1193         {
1194             result = OC_STACK_ERROR;
1195         }
1196
1197         return result;
1198     }
1199
1200 #ifdef WITH_CLOUD
1201     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1202                                                                const std::string& host,
1203                                                                const std::vector<std::string>& di,
1204                                                                OCConnectivityType connectivityType,
1205                                                                ObserveCallback& callback)
1206     {
1207         if (!callback)
1208         {
1209             return OC_STACK_INVALID_PARAM;
1210         }
1211         OCStackResult result;
1212
1213         ClientCallbackContext::ObserveContext* ctx =
1214             new ClientCallbackContext::ObserveContext(callback);
1215         OCCallbackData cbdata;
1216         cbdata.context = static_cast<void*>(ctx),
1217         cbdata.cb      = observeResourceCallback;
1218         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1219
1220         auto cLock = m_csdkLock.lock();
1221
1222         if (cLock)
1223         {
1224             std::lock_guard<std::recursive_mutex> lock(*cLock);
1225
1226             std::ostringstream os;
1227             os << host << OC_RSRVD_DEVICE_PRESENCE_URI;
1228             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1229             std::string url = assembleSetResourceUri(os.str(), queryParams);
1230
1231             result = OCDoResource(handle, OC_REST_OBSERVE,
1232                                   url.c_str(), nullptr,
1233                                   nullptr, connectivityType,
1234                                   OC_LOW_QOS, &cbdata,
1235                                   nullptr, 0);
1236         }
1237         else
1238         {
1239             delete ctx;
1240             result = OC_STACK_ERROR;
1241         }
1242
1243         return result;
1244     }
1245 #endif
1246
1247     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1248     {
1249         qos = m_cfg.QoS;
1250         return OC_STACK_OK;
1251     }
1252
1253     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1254            const HeaderOptions& headerOptions)
1255     {
1256         int i = 0;
1257
1258         if ( headerOptions.size() == 0)
1259         {
1260             return nullptr;
1261         }
1262
1263         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1264         {
1265             options[i] = OCHeaderOption();
1266             options[i].protocolID = OC_COAP_ID;
1267             options[i].optionID = it->getOptionID();
1268             options[i].optionLength = it->getOptionData().length() + 1;
1269             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1270             i++;
1271         }
1272
1273         return options;
1274     }
1275
1276     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1277     {
1278         if (!dev)
1279         {
1280             return nullptr;
1281         }
1282
1283         OCDPDev_t* result = new OCDPDev_t(*dev);
1284         result->prm = new OCPrm_t[dev->prmLen];
1285         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1286         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1287     }
1288
1289     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1290     {
1291         while(list)
1292         {
1293             dpList.push_back(cloneDevice(list));
1294             list = list->next;
1295         }
1296     }
1297
1298     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1299             GetDirectPairedCallback& callback)
1300     {
1301         if (!callback || 0 == waittime)
1302         {
1303             return OC_STACK_INVALID_PARAM;
1304         }
1305
1306         OCStackResult result = OC_STACK_ERROR;
1307         const OCDPDev_t *list = nullptr;
1308         PairedDevices dpDeviceList;
1309
1310         auto cLock = m_csdkLock.lock();
1311
1312         if (cLock)
1313         {
1314             std::lock_guard<std::recursive_mutex> lock(*cLock);
1315
1316             list = OCDiscoverDirectPairingDevices(waittime);
1317             if (NULL == list)
1318             {
1319                 result = OC_STACK_NO_RESOURCE;
1320                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1321                     << std::flush;
1322             }
1323             else {
1324                 convert(list, dpDeviceList);
1325                 std::thread exec(callback, dpDeviceList);
1326                 exec.detach();
1327                 result = OC_STACK_OK;
1328             }
1329         }
1330         else
1331         {
1332             result = OC_STACK_ERROR;
1333         }
1334
1335         return result;
1336     }
1337
1338     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1339     {
1340         if (!callback)
1341         {
1342             return OC_STACK_INVALID_PARAM;
1343         }
1344
1345         OCStackResult result = OC_STACK_ERROR;
1346         const OCDPDev_t *list = nullptr;
1347         PairedDevices dpDeviceList;
1348
1349         auto cLock = m_csdkLock.lock();
1350
1351         if (cLock)
1352         {
1353             std::lock_guard<std::recursive_mutex> lock(*cLock);
1354
1355             list = OCGetDirectPairedDevices();
1356             if (NULL == list)
1357             {
1358                 result = OC_STACK_NO_RESOURCE;
1359                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1360                     << std::flush;
1361             }
1362             else {
1363                 convert(list, dpDeviceList);
1364                 std::thread exec(callback, dpDeviceList);
1365                 exec.detach();
1366                 result = OC_STACK_OK;
1367             }
1368         }
1369         else
1370         {
1371             result = OC_STACK_ERROR;
1372         }
1373
1374         return result;
1375     }
1376
1377     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1378             OCStackResult result)
1379     {
1380
1381         ClientCallbackContext::DirectPairingContext* context =
1382             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1383
1384         std::thread exec(context->callback, cloneDevice(peer), result);
1385         exec.detach();
1386     }
1387
1388     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1389             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1390     {
1391         if (!peer || !callback)
1392         {
1393             oclog() << "Invalid parameters" << std::flush;
1394             return OC_STACK_INVALID_PARAM;
1395         }
1396
1397         OCStackResult result = OC_STACK_ERROR;
1398         ClientCallbackContext::DirectPairingContext* context =
1399             new ClientCallbackContext::DirectPairingContext(callback);
1400
1401         auto cLock = m_csdkLock.lock();
1402         if (cLock)
1403         {
1404             std::lock_guard<std::recursive_mutex> lock(*cLock);
1405             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1406                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1407         }
1408         else
1409         {
1410             delete context;
1411             result = OC_STACK_ERROR;
1412         }
1413         return result;
1414     }
1415 }