Merge branch 'notification-service'
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try
169         {
170             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
171                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
172             // loop to ensure valid construction of all resources
173
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179         }
180         catch (std::exception &e)
181         {
182             oclog() << "Exception in listCallback, ignoring response: "
183                     << e.what() << std::flush;
184         }
185
186
187         return OC_STACK_KEEP_TRANSACTION;
188     }
189
190     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
191         OCClientResponse* clientResponse)
192     {
193         if (!ctx || !clientResponse)
194         {
195             return OC_STACK_KEEP_TRANSACTION;
196         }
197
198         ClientCallbackContext::ListenErrorContext* context =
199             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
200         if (!context)
201         {
202             return OC_STACK_KEEP_TRANSACTION;
203         }
204
205         OCStackResult result = clientResponse->result;
206         if (result == OC_STACK_OK)
207         {
208             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
209             {
210                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
211                     << std::flush;
212                 return OC_STACK_KEEP_TRANSACTION;
213             }
214
215             auto clientWrapper = context->clientWrapper.lock();
216
217             if (!clientWrapper)
218             {
219                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
220                         << std::flush;
221                 return OC_STACK_KEEP_TRANSACTION;
222             }
223
224             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
225                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
226             // loop to ensure valid construction of all resources
227             for (auto resource : container.Resources())
228             {
229                 std::thread exec(context->callback, resource);
230                 exec.detach();
231             }
232             return OC_STACK_KEEP_TRANSACTION;
233         }
234
235         std::string resourceURI = clientResponse->resourceUri;
236         std::thread exec(context->errorCallback, resourceURI, result);
237         exec.detach();
238         return OC_STACK_DELETE_TRANSACTION;
239     }
240
241     OCStackResult InProcClientWrapper::ListenForResource(
242             const std::string& serviceUrl,
243             const std::string& resourceType,
244             OCConnectivityType connectivityType,
245             FindCallback& callback, QualityOfService QoS)
246     {
247         if (!callback)
248         {
249             return OC_STACK_INVALID_PARAM;
250         }
251
252         OCStackResult result;
253         ostringstream resourceUri;
254         resourceUri << serviceUrl << resourceType;
255
256         ClientCallbackContext::ListenContext* context =
257             new ClientCallbackContext::ListenContext(callback, shared_from_this());
258         OCCallbackData cbdata;
259         cbdata.context = static_cast<void*>(context),
260         cbdata.cb      = listenCallback;
261         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
262
263         auto cLock = m_csdkLock.lock();
264         if (cLock)
265         {
266             std::lock_guard<std::recursive_mutex> lock(*cLock);
267             result = OCDoResource(nullptr, OC_REST_DISCOVER,
268                                   resourceUri.str().c_str(),
269                                   nullptr, nullptr, connectivityType,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   nullptr, 0);
273         }
274         else
275         {
276             delete context;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282     OCStackResult InProcClientWrapper::ListenErrorForResource(
283             const std::string& serviceUrl,
284             const std::string& resourceType,
285             OCConnectivityType connectivityType,
286             FindCallback& callback, FindErrorCallback& errorCallback,
287             QualityOfService QoS)
288     {
289         if (!callback)
290         {
291             return OC_STACK_INVALID_PARAM;
292         }
293
294         ostringstream resourceUri;
295         resourceUri << serviceUrl << resourceType;
296
297         ClientCallbackContext::ListenErrorContext* context =
298             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
299                                                           shared_from_this());
300         if (!context)
301         {
302             return OC_STACK_ERROR;
303         }
304
305         OCCallbackData cbdata(
306                 static_cast<void*>(context),
307                 listenErrorCallback,
308                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
309             );
310
311         OCStackResult result;
312         auto cLock = m_csdkLock.lock();
313         if (cLock)
314         {
315             std::lock_guard<std::recursive_mutex> lock(*cLock);
316             result = OCDoResource(nullptr, OC_REST_DISCOVER,
317                                   resourceUri.str().c_str(),
318                                   nullptr, nullptr, connectivityType,
319                                   static_cast<OCQualityOfService>(QoS),
320                                   &cbdata,
321                                   nullptr, 0);
322         }
323         else
324         {
325             delete context;
326             result = OC_STACK_ERROR;
327         }
328         return result;
329     }
330 #ifdef WITH_MQ
331     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
332                                               OCClientResponse* clientResponse)
333     {
334         ClientCallbackContext::MQTopicContext* context =
335             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
336
337         if (!clientResponse || !context)
338         {
339             return OC_STACK_DELETE_TRANSACTION;
340         }
341
342         std::string resourceURI = clientResponse->resourceUri;
343         if (clientResponse->result != OC_STACK_OK)
344         {
345             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
346                     << clientResponse->result
347                     << std::flush;
348
349             std::thread exec(context->callback, clientResponse->result,
350                              resourceURI, nullptr);
351             exec.detach();
352
353             return OC_STACK_DELETE_TRANSACTION;
354         }
355
356         auto clientWrapper = context->clientWrapper.lock();
357         if (!clientWrapper)
358         {
359             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
360                     << std::flush;
361             return OC_STACK_DELETE_TRANSACTION;
362         }
363
364         try
365         {
366             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
367                                         (OCRepPayload *) clientResponse->payload);
368
369             // loop to ensure valid construction of all resources
370             for (auto resource : container.Resources())
371             {
372                 std::thread exec(context->callback, clientResponse->result,
373                                  resourceURI, resource);
374                 exec.detach();
375             }
376         }
377         catch (std::exception &e)
378         {
379             oclog() << "Exception in listCallback, ignoring response: "
380                     << e.what() << std::flush;
381         }
382
383         return OC_STACK_DELETE_TRANSACTION;
384     }
385
386     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
387                                                         const std::string& resourceUri,
388                                                         const QueryParamsMap& queryParams,
389                                                         const HeaderOptions& headerOptions,
390                                                         MQTopicCallback& callback,
391                                                         QualityOfService QoS)
392     {
393         oclog() << "ListenForMQTopic()" << std::flush;
394
395         if (!callback)
396         {
397             return OC_STACK_INVALID_PARAM;
398         }
399
400         ClientCallbackContext::MQTopicContext* context =
401             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
402         OCCallbackData cbdata;
403         cbdata.context = static_cast<void*>(context),
404         cbdata.cb      = listenMQCallback;
405         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
406
407         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
408
409         OCStackResult result = OC_STACK_ERROR;
410         auto cLock = m_csdkLock.lock();
411         if (cLock)
412         {
413             std::lock_guard<std::recursive_mutex> lock(*cLock);
414             OCHeaderOption options[MAX_HEADER_OPTIONS];
415             result = OCDoResource(
416                                   nullptr, OC_REST_GET,
417                                   uri.c_str(),
418                                   &devAddr, nullptr,
419                                   CT_DEFAULT,
420                                   static_cast<OCQualityOfService>(QoS),
421                                   &cbdata,
422                                   assembleHeaderOptions(options, headerOptions),
423                                   headerOptions.size());
424         }
425         else
426         {
427             delete context;
428         }
429
430         return result;
431     }
432 #endif
433
434     OCStackApplicationResult listenDeviceCallback(void* ctx,
435                                                   OCDoHandle /*handle*/,
436             OCClientResponse* clientResponse)
437     {
438         ClientCallbackContext::DeviceListenContext* context =
439             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
440
441         try
442         {
443             OCRepresentation rep = parseGetSetCallback(clientResponse);
444             std::thread exec(context->callback, rep);
445             exec.detach();
446         }
447         catch(OC::OCException& e)
448         {
449             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
450                 <<e.what() <<std::flush;
451         }
452
453         return OC_STACK_KEEP_TRANSACTION;
454     }
455
456     OCStackResult InProcClientWrapper::ListenForDevice(
457             const std::string& serviceUrl,
458             const std::string& deviceURI,
459             OCConnectivityType connectivityType,
460             FindDeviceCallback& callback,
461             QualityOfService QoS)
462     {
463         if (!callback)
464         {
465             return OC_STACK_INVALID_PARAM;
466         }
467         OCStackResult result;
468         ostringstream deviceUri;
469         deviceUri << serviceUrl << deviceURI;
470
471         ClientCallbackContext::DeviceListenContext* context =
472             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
473         OCCallbackData cbdata;
474
475         cbdata.context = static_cast<void*>(context),
476         cbdata.cb      = listenDeviceCallback;
477         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
478
479         auto cLock = m_csdkLock.lock();
480         if (cLock)
481         {
482             std::lock_guard<std::recursive_mutex> lock(*cLock);
483             result = OCDoResource(nullptr, OC_REST_DISCOVER,
484                                   deviceUri.str().c_str(),
485                                   nullptr, nullptr, connectivityType,
486                                   static_cast<OCQualityOfService>(QoS),
487                                   &cbdata,
488                                   nullptr, 0);
489         }
490         else
491         {
492             delete context;
493             result = OC_STACK_ERROR;
494         }
495         return result;
496     }
497
498     void parseServerHeaderOptions(OCClientResponse* clientResponse,
499                     HeaderOptions& serverHeaderOptions)
500     {
501         if (clientResponse)
502         {
503             // Parse header options from server
504             uint16_t optionID;
505             std::string optionData;
506
507             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
508             {
509                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
510                 optionData = reinterpret_cast<const char*>
511                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
512                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
513                 serverHeaderOptions.push_back(headerOption);
514             }
515         }
516         else
517         {
518             // clientResponse is invalid
519             // TODO check proper logging
520             std::cout << " Invalid response " << std::endl;
521         }
522     }
523
524 #ifdef WITH_MQ
525     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
526                     OCClientResponse* clientResponse)
527     {
528         ClientCallbackContext::MQTopicContext* context =
529             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
530         HeaderOptions serverHeaderOptions;
531
532         if (!clientResponse || !context)
533         {
534             return OC_STACK_DELETE_TRANSACTION;
535         }
536
537         std::string createdUri;
538         bool isLocationOption = false;
539         OCStackResult result = clientResponse->result;
540         if (OC_STACK_OK               == result ||
541             OC_STACK_RESOURCE_CREATED == result)
542         {
543             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
544
545             for (auto headerOption : serverHeaderOptions)
546             {
547                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
548                 {
549                     createdUri += "/";
550                     createdUri += headerOption.getOptionData();
551                     if (!isLocationOption)
552                     {
553                         isLocationOption = true;
554                     }
555                 }
556             }
557         }
558
559         if (!isLocationOption)
560         {
561             createdUri = std::string(clientResponse->resourceUri);
562         }
563
564         auto clientWrapper = context->clientWrapper.lock();
565
566         if (!clientWrapper)
567         {
568             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
569                     << std::flush;
570             return OC_STACK_DELETE_TRANSACTION;
571         }
572
573         try
574         {
575             if (OC_STACK_OK               == result ||
576                 OC_STACK_RESOURCE_CREATED == result)
577             {
578                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
579                                             createdUri);
580                 for (auto resource : container.Resources())
581                 {
582                     std::thread exec(context->callback, result,
583                                      createdUri,
584                                      resource);
585                     exec.detach();
586                 }
587             }
588             else
589             {
590                 std::thread exec(context->callback, result,
591                                  createdUri,
592                                  nullptr);
593                 exec.detach();
594             }
595         }
596         catch (std::exception &e)
597         {
598             oclog() << "Exception in createMQTopicCallback, ignoring response: "
599                     << e.what() << std::flush;
600         }
601         return OC_STACK_DELETE_TRANSACTION;
602     }
603
604     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
605                 const OCDevAddr& devAddr,
606                 const std::string& uri,
607                 const OCRepresentation& rep,
608                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
609                 MQTopicCallback& callback, QualityOfService QoS)
610     {
611         if (!callback)
612         {
613             return OC_STACK_INVALID_PARAM;
614         }
615         OCStackResult result;
616         ClientCallbackContext::MQTopicContext* ctx =
617                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
618         OCCallbackData cbdata;
619         cbdata.context = static_cast<void*>(ctx),
620         cbdata.cb      = createMQTopicCallback;
621         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
622
623         std::string url = assembleSetResourceUri(uri, queryParams);
624
625         auto cLock = m_csdkLock.lock();
626
627         if (cLock)
628         {
629             std::lock_guard<std::recursive_mutex> lock(*cLock);
630             OCHeaderOption options[MAX_HEADER_OPTIONS];
631
632             result = OCDoResource(nullptr, OC_REST_PUT,
633                                   url.c_str(), &devAddr,
634                                   assembleSetResourcePayload(rep),
635                                   CT_DEFAULT,
636                                   static_cast<OCQualityOfService>(QoS),
637                                   &cbdata,
638                                   assembleHeaderOptions(options, headerOptions),
639                                   headerOptions.size());
640         }
641         else
642         {
643             delete ctx;
644             result = OC_STACK_ERROR;
645         }
646
647         return result;
648     }
649 #endif
650     OCStackApplicationResult getResourceCallback(void* ctx,
651                                                  OCDoHandle /*handle*/,
652         OCClientResponse* clientResponse)
653     {
654         ClientCallbackContext::GetContext* context =
655             static_cast<ClientCallbackContext::GetContext*>(ctx);
656
657         OCRepresentation rep;
658         HeaderOptions serverHeaderOptions;
659         OCStackResult result = clientResponse->result;
660         if (result == OC_STACK_OK)
661         {
662             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
663             try
664             {
665                 rep = parseGetSetCallback(clientResponse);
666             }
667             catch(OC::OCException& e)
668             {
669                 result = e.code();
670             }
671         }
672
673         std::thread exec(context->callback, serverHeaderOptions, rep, result);
674         exec.detach();
675         return OC_STACK_DELETE_TRANSACTION;
676     }
677
678     OCStackResult InProcClientWrapper::GetResourceRepresentation(
679         const OCDevAddr& devAddr,
680         const std::string& resourceUri,
681         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
682         OCConnectivityType connectivityType,
683         GetCallback& callback, QualityOfService QoS)
684     {
685         if (!callback)
686         {
687             return OC_STACK_INVALID_PARAM;
688         }
689         OCStackResult result;
690         ClientCallbackContext::GetContext* ctx =
691             new ClientCallbackContext::GetContext(callback);
692         OCCallbackData cbdata;
693         cbdata.context = static_cast<void*>(ctx),
694         cbdata.cb      = getResourceCallback;
695         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
696
697
698         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
699
700         auto cLock = m_csdkLock.lock();
701
702         if (cLock)
703         {
704             std::lock_guard<std::recursive_mutex> lock(*cLock);
705             OCHeaderOption options[MAX_HEADER_OPTIONS];
706
707             result = OCDoResource(
708                                   nullptr, OC_REST_GET,
709                                   uri.c_str(),
710                                   &devAddr, nullptr,
711                                   connectivityType,
712                                   static_cast<OCQualityOfService>(QoS),
713                                   &cbdata,
714                                   assembleHeaderOptions(options, headerOptions),
715                                   headerOptions.size());
716         }
717         else
718         {
719             delete ctx;
720             result = OC_STACK_ERROR;
721         }
722         return result;
723     }
724
725
726     OCStackApplicationResult setResourceCallback(void* ctx,
727                                                  OCDoHandle /*handle*/,
728         OCClientResponse* clientResponse)
729     {
730         ClientCallbackContext::SetContext* context =
731             static_cast<ClientCallbackContext::SetContext*>(ctx);
732         OCRepresentation attrs;
733         HeaderOptions serverHeaderOptions;
734
735         OCStackResult result = clientResponse->result;
736         if (OC_STACK_OK               == result ||
737             OC_STACK_RESOURCE_CREATED == result ||
738             OC_STACK_RESOURCE_DELETED == result ||
739             OC_STACK_RESOURCE_CHANGED == result)
740         {
741             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
742             try
743             {
744                 attrs = parseGetSetCallback(clientResponse);
745             }
746             catch(OC::OCException& e)
747             {
748                 result = e.code();
749             }
750         }
751
752         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
753         exec.detach();
754         return OC_STACK_DELETE_TRANSACTION;
755     }
756
757     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
758         const QueryParamsMap& queryParams)
759     {
760         if (!uri.empty())
761         {
762             if (uri.back() == '/')
763             {
764                 uri.resize(uri.size() - 1);
765             }
766         }
767
768         ostringstream paramsList;
769         if (queryParams.size() > 0)
770         {
771             paramsList << '?';
772         }
773
774         for (auto& param : queryParams)
775         {
776             paramsList << param.first <<'='<<param.second<<';';
777         }
778
779         std::string queryString = paramsList.str();
780
781         if (queryString.empty())
782         {
783             return uri;
784         }
785
786         if (queryString.back() == ';')
787         {
788             queryString.resize(queryString.size() - 1);
789         }
790
791         std::string ret = uri + queryString;
792         return ret;
793     }
794
795     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
796         const QueryParamsList& queryParams)
797     {
798         if (!uri.empty())
799         {
800             if (uri.back() == '/')
801             {
802                 uri.resize(uri.size() - 1);
803             }
804         }
805
806         ostringstream paramsList;
807         if (queryParams.size() > 0)
808         {
809             paramsList << '?';
810         }
811
812         for (auto& param : queryParams)
813         {
814             for (auto& paramList : param.second)
815             {
816                 paramsList << param.first << '=' << paramList << ';';
817             }
818         }
819
820         std::string queryString = paramsList.str();
821
822         if (queryString.empty())
823         {
824             return uri;
825         }
826
827         if (queryString.back() == ';')
828         {
829             queryString.resize(queryString.size() - 1);
830         }
831
832         std::string ret = uri + queryString;
833         return ret;
834     }
835
836     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
837     {
838         MessageContainer ocInfo;
839         ocInfo.addRepresentation(rep);
840         for(const OCRepresentation& r : rep.getChildren())
841         {
842             ocInfo.addRepresentation(r);
843         }
844
845         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
846     }
847
848     OCStackResult InProcClientWrapper::PostResourceRepresentation(
849         const OCDevAddr& devAddr,
850         const std::string& uri,
851         const OCRepresentation& rep,
852         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
853         OCConnectivityType connectivityType,
854         PostCallback& callback, QualityOfService QoS)
855     {
856         if (!callback)
857         {
858             return OC_STACK_INVALID_PARAM;
859         }
860         OCStackResult result;
861         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
862         OCCallbackData cbdata;
863         cbdata.context = static_cast<void*>(ctx),
864         cbdata.cb      = setResourceCallback;
865         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
866
867
868         std::string url = assembleSetResourceUri(uri, queryParams);
869
870         auto cLock = m_csdkLock.lock();
871
872         if (cLock)
873         {
874             std::lock_guard<std::recursive_mutex> lock(*cLock);
875             OCHeaderOption options[MAX_HEADER_OPTIONS];
876
877             result = OCDoResource(nullptr, OC_REST_POST,
878                                   url.c_str(), &devAddr,
879                                   assembleSetResourcePayload(rep),
880                                   connectivityType,
881                                   static_cast<OCQualityOfService>(QoS),
882                                   &cbdata,
883                                   assembleHeaderOptions(options, headerOptions),
884                                   headerOptions.size());
885         }
886         else
887         {
888             delete ctx;
889             result = OC_STACK_ERROR;
890         }
891
892         return result;
893     }
894
895     OCStackResult InProcClientWrapper::PutResourceRepresentation(
896         const OCDevAddr& devAddr,
897         const std::string& uri,
898         const OCRepresentation& rep,
899         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
900         PutCallback& callback, QualityOfService QoS)
901     {
902         if (!callback)
903         {
904             return OC_STACK_INVALID_PARAM;
905         }
906         OCStackResult result;
907         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
908         OCCallbackData cbdata;
909         cbdata.context = static_cast<void*>(ctx),
910         cbdata.cb      = setResourceCallback;
911         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
912
913
914         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
915
916         auto cLock = m_csdkLock.lock();
917
918         if (cLock)
919         {
920             std::lock_guard<std::recursive_mutex> lock(*cLock);
921             OCDoHandle handle;
922             OCHeaderOption options[MAX_HEADER_OPTIONS];
923
924             result = OCDoResource(&handle, OC_REST_PUT,
925                                   url.c_str(), &devAddr,
926                                   assembleSetResourcePayload(rep),
927                                   CT_DEFAULT,
928                                   static_cast<OCQualityOfService>(QoS),
929                                   &cbdata,
930                                   assembleHeaderOptions(options, headerOptions),
931                                   headerOptions.size());
932         }
933         else
934         {
935             delete ctx;
936             result = OC_STACK_ERROR;
937         }
938
939         return result;
940     }
941
942     OCStackApplicationResult deleteResourceCallback(void* ctx,
943                                                     OCDoHandle /*handle*/,
944         OCClientResponse* clientResponse)
945     {
946         ClientCallbackContext::DeleteContext* context =
947             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
948         HeaderOptions serverHeaderOptions;
949
950         if (clientResponse->result == OC_STACK_OK)
951         {
952             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
953         }
954         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
955         exec.detach();
956         return OC_STACK_DELETE_TRANSACTION;
957     }
958
959     OCStackResult InProcClientWrapper::DeleteResource(
960         const OCDevAddr& devAddr,
961         const std::string& uri,
962         const HeaderOptions& headerOptions,
963         OCConnectivityType connectivityType,
964         DeleteCallback& callback,
965         QualityOfService /*QoS*/)
966     {
967         if (!callback)
968         {
969             return OC_STACK_INVALID_PARAM;
970         }
971         OCStackResult result;
972         ClientCallbackContext::DeleteContext* ctx =
973             new ClientCallbackContext::DeleteContext(callback);
974         OCCallbackData cbdata;
975         cbdata.context = static_cast<void*>(ctx),
976         cbdata.cb      = deleteResourceCallback;
977         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
978
979
980         auto cLock = m_csdkLock.lock();
981
982         if (cLock)
983         {
984             OCHeaderOption options[MAX_HEADER_OPTIONS];
985
986             std::lock_guard<std::recursive_mutex> lock(*cLock);
987
988             result = OCDoResource(nullptr, OC_REST_DELETE,
989                                   uri.c_str(), &devAddr,
990                                   nullptr,
991                                   connectivityType,
992                                   static_cast<OCQualityOfService>(m_cfg.QoS),
993                                   &cbdata,
994                                   assembleHeaderOptions(options, headerOptions),
995                                   headerOptions.size());
996         }
997         else
998         {
999             delete ctx;
1000             result = OC_STACK_ERROR;
1001         }
1002
1003         return result;
1004     }
1005
1006     OCStackApplicationResult observeResourceCallback(void* ctx,
1007                                                      OCDoHandle /*handle*/,
1008         OCClientResponse* clientResponse)
1009     {
1010         ClientCallbackContext::ObserveContext* context =
1011             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1012         OCRepresentation attrs;
1013         HeaderOptions serverHeaderOptions;
1014         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1015         OCStackResult result = clientResponse->result;
1016         if (clientResponse->result == OC_STACK_OK)
1017         {
1018             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1019             try
1020             {
1021                 attrs = parseGetSetCallback(clientResponse);
1022             }
1023             catch(OC::OCException& e)
1024             {
1025                 result = e.code();
1026             }
1027         }
1028         std::thread exec(context->callback, serverHeaderOptions, attrs,
1029                     result, sequenceNumber);
1030         exec.detach();
1031
1032         return OC_STACK_KEEP_TRANSACTION;
1033     }
1034
1035     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1036         const OCDevAddr& devAddr,
1037         const std::string& uri,
1038         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1039         ObserveCallback& callback, QualityOfService QoS)
1040     {
1041         if (!callback)
1042         {
1043             return OC_STACK_INVALID_PARAM;
1044         }
1045         OCStackResult result;
1046
1047         ClientCallbackContext::ObserveContext* ctx =
1048             new ClientCallbackContext::ObserveContext(callback);
1049         OCCallbackData cbdata;
1050         cbdata.context = static_cast<void*>(ctx),
1051         cbdata.cb      = observeResourceCallback;
1052         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1053
1054
1055         OCMethod method;
1056         if (observeType == ObserveType::Observe)
1057         {
1058             method = OC_REST_OBSERVE;
1059         }
1060         else if (observeType == ObserveType::ObserveAll)
1061         {
1062             method = OC_REST_OBSERVE_ALL;
1063         }
1064         else
1065         {
1066             method = OC_REST_OBSERVE_ALL;
1067         }
1068
1069         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1070
1071         auto cLock = m_csdkLock.lock();
1072
1073         if (cLock)
1074         {
1075             std::lock_guard<std::recursive_mutex> lock(*cLock);
1076             OCHeaderOption options[MAX_HEADER_OPTIONS];
1077
1078             result = OCDoResource(handle, method,
1079                                   url.c_str(), &devAddr,
1080                                   nullptr,
1081                                   CT_DEFAULT,
1082                                   static_cast<OCQualityOfService>(QoS),
1083                                   &cbdata,
1084                                   assembleHeaderOptions(options, headerOptions),
1085                                   headerOptions.size());
1086         }
1087         else
1088         {
1089             delete ctx;
1090             return OC_STACK_ERROR;
1091         }
1092
1093         return result;
1094     }
1095
1096     OCStackResult InProcClientWrapper::CancelObserveResource(
1097             OCDoHandle handle,
1098             const std::string& /*host*/,
1099             const std::string& /*uri*/,
1100             const HeaderOptions& headerOptions,
1101             QualityOfService QoS)
1102     {
1103         OCStackResult result;
1104         auto cLock = m_csdkLock.lock();
1105
1106         if (cLock)
1107         {
1108             std::lock_guard<std::recursive_mutex> lock(*cLock);
1109             OCHeaderOption options[MAX_HEADER_OPTIONS];
1110
1111             result = OCCancel(handle,
1112                     static_cast<OCQualityOfService>(QoS),
1113                     assembleHeaderOptions(options, headerOptions),
1114                     headerOptions.size());
1115         }
1116         else
1117         {
1118             result = OC_STACK_ERROR;
1119         }
1120
1121         return result;
1122     }
1123
1124     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1125                                                        OCDoHandle /*handle*/,
1126             OCClientResponse* clientResponse)
1127     {
1128         ClientCallbackContext::SubscribePresenceContext* context =
1129         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1130
1131         /*
1132          * This a hack while we rethink presence subscription.
1133          */
1134         std::string url = clientResponse->devAddr.addr;
1135
1136         std::thread exec(context->callback, clientResponse->result,
1137                     clientResponse->sequenceNumber, url);
1138
1139         exec.detach();
1140
1141         return OC_STACK_KEEP_TRANSACTION;
1142     }
1143
1144     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1145         const std::string& host, const std::string& resourceType,
1146         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1147     {
1148         if (!presenceHandler)
1149         {
1150             return OC_STACK_INVALID_PARAM;
1151         }
1152
1153         ClientCallbackContext::SubscribePresenceContext* ctx =
1154             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1155         OCCallbackData cbdata;
1156         cbdata.context = static_cast<void*>(ctx),
1157         cbdata.cb      = subscribePresenceCallback;
1158         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1159
1160
1161         auto cLock = m_csdkLock.lock();
1162
1163         std::ostringstream os;
1164         os << host << OC_RSRVD_PRESENCE_URI;
1165
1166         if (!resourceType.empty())
1167         {
1168             os << "?rt=" << resourceType;
1169         }
1170
1171         if (!cLock)
1172         {
1173             delete ctx;
1174             return OC_STACK_ERROR;
1175         }
1176
1177         return OCDoResource(handle, OC_REST_PRESENCE,
1178                             os.str().c_str(), nullptr,
1179                             nullptr, connectivityType,
1180                             OC_LOW_QOS, &cbdata, NULL, 0);
1181     }
1182
1183     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1184     {
1185         OCStackResult result;
1186         auto cLock = m_csdkLock.lock();
1187
1188         if (cLock)
1189         {
1190             std::lock_guard<std::recursive_mutex> lock(*cLock);
1191             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1192         }
1193         else
1194         {
1195             result = OC_STACK_ERROR;
1196         }
1197
1198         return result;
1199     }
1200
1201 #ifdef WITH_CLOUD
1202     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1203                                                                const std::string& host,
1204                                                                const std::vector<std::string>& di,
1205                                                                OCConnectivityType connectivityType,
1206                                                                ObserveCallback& callback)
1207     {
1208         if (!callback)
1209         {
1210             return OC_STACK_INVALID_PARAM;
1211         }
1212         OCStackResult result;
1213
1214         ClientCallbackContext::ObserveContext* ctx =
1215             new ClientCallbackContext::ObserveContext(callback);
1216         OCCallbackData cbdata;
1217         cbdata.context = static_cast<void*>(ctx),
1218         cbdata.cb      = observeResourceCallback;
1219         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1220
1221         auto cLock = m_csdkLock.lock();
1222
1223         if (cLock)
1224         {
1225             std::lock_guard<std::recursive_mutex> lock(*cLock);
1226
1227             std::ostringstream os;
1228             os << host << OC_RSRVD_DEVICE_PRESENCE_URI;
1229             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1230             std::string url = assembleSetResourceUri(os.str(), queryParams);
1231
1232             result = OCDoResource(handle, OC_REST_OBSERVE,
1233                                   url.c_str(), nullptr,
1234                                   nullptr, connectivityType,
1235                                   OC_LOW_QOS, &cbdata,
1236                                   nullptr, 0);
1237         }
1238         else
1239         {
1240             delete ctx;
1241             result = OC_STACK_ERROR;
1242         }
1243
1244         return result;
1245     }
1246 #endif
1247
1248     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1249     {
1250         qos = m_cfg.QoS;
1251         return OC_STACK_OK;
1252     }
1253
1254     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1255            const HeaderOptions& headerOptions)
1256     {
1257         int i = 0;
1258
1259         if ( headerOptions.size() == 0)
1260         {
1261             return nullptr;
1262         }
1263
1264         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1265         {
1266             options[i] = OCHeaderOption();
1267             options[i].protocolID = OC_COAP_ID;
1268             options[i].optionID = it->getOptionID();
1269             options[i].optionLength = it->getOptionData().length() + 1;
1270             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1271             i++;
1272         }
1273
1274         return options;
1275     }
1276
1277     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1278     {
1279         if (!dev)
1280         {
1281             return nullptr;
1282         }
1283
1284         OCDPDev_t* result = new OCDPDev_t(*dev);
1285         result->prm = new OCPrm_t[dev->prmLen];
1286         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1287         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1288     }
1289
1290     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1291     {
1292         while(list)
1293         {
1294             dpList.push_back(cloneDevice(list));
1295             list = list->next;
1296         }
1297     }
1298
1299     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1300             GetDirectPairedCallback& callback)
1301     {
1302         if (!callback || 0 == waittime)
1303         {
1304             return OC_STACK_INVALID_PARAM;
1305         }
1306
1307         OCStackResult result = OC_STACK_ERROR;
1308         const OCDPDev_t *list = nullptr;
1309         PairedDevices dpDeviceList;
1310
1311         auto cLock = m_csdkLock.lock();
1312
1313         if (cLock)
1314         {
1315             std::lock_guard<std::recursive_mutex> lock(*cLock);
1316
1317             list = OCDiscoverDirectPairingDevices(waittime);
1318             if (NULL == list)
1319             {
1320                 result = OC_STACK_NO_RESOURCE;
1321                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1322                     << std::flush;
1323             }
1324             else {
1325                 convert(list, dpDeviceList);
1326                 std::thread exec(callback, dpDeviceList);
1327                 exec.detach();
1328                 result = OC_STACK_OK;
1329             }
1330         }
1331         else
1332         {
1333             result = OC_STACK_ERROR;
1334         }
1335
1336         return result;
1337     }
1338
1339     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1340     {
1341         if (!callback)
1342         {
1343             return OC_STACK_INVALID_PARAM;
1344         }
1345
1346         OCStackResult result = OC_STACK_ERROR;
1347         const OCDPDev_t *list = nullptr;
1348         PairedDevices dpDeviceList;
1349
1350         auto cLock = m_csdkLock.lock();
1351
1352         if (cLock)
1353         {
1354             std::lock_guard<std::recursive_mutex> lock(*cLock);
1355
1356             list = OCGetDirectPairedDevices();
1357             if (NULL == list)
1358             {
1359                 result = OC_STACK_NO_RESOURCE;
1360                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1361                     << std::flush;
1362             }
1363             else {
1364                 convert(list, dpDeviceList);
1365                 std::thread exec(callback, dpDeviceList);
1366                 exec.detach();
1367                 result = OC_STACK_OK;
1368             }
1369         }
1370         else
1371         {
1372             result = OC_STACK_ERROR;
1373         }
1374
1375         return result;
1376     }
1377
1378     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1379             OCStackResult result)
1380     {
1381
1382         ClientCallbackContext::DirectPairingContext* context =
1383             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1384
1385         std::thread exec(context->callback, cloneDevice(peer), result);
1386         exec.detach();
1387     }
1388
1389     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1390             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1391     {
1392         if (!peer || !callback)
1393         {
1394             oclog() << "Invalid parameters" << std::flush;
1395             return OC_STACK_INVALID_PARAM;
1396         }
1397
1398         OCStackResult result = OC_STACK_ERROR;
1399         ClientCallbackContext::DirectPairingContext* context =
1400             new ClientCallbackContext::DirectPairingContext(callback);
1401
1402         auto cLock = m_csdkLock.lock();
1403         if (cLock)
1404         {
1405             std::lock_guard<std::recursive_mutex> lock(*cLock);
1406             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1407                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1408         }
1409         else
1410         {
1411             delete context;
1412             result = OC_STACK_ERROR;
1413         }
1414         return result;
1415     }
1416 }