[IOT-1451] Keep client callback when receiving error result
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
28 using namespace std;
29
30 namespace OC
31 {
32     InProcClientWrapper::InProcClientWrapper(
33         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34             : m_threadRun(false), m_csdkLock(csdkLock),
35               m_cfg { cfg }
36     {
37         // if the config type is server, we ought to never get called.  If the config type
38         // is both, we count on the server to run the thread and do the initialize
39
40         if (m_cfg.mode == ModeType::Client)
41         {
42             OCTransportFlags serverFlags =
43                             static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44             OCTransportFlags clientFlags =
45                             static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46             OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47
48             if (OC_STACK_OK != result)
49             {
50                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
51             }
52
53             m_threadRun = true;
54             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
55         }
56     }
57
58     InProcClientWrapper::~InProcClientWrapper()
59     {
60         if (m_threadRun && m_listeningThread.joinable())
61         {
62             m_threadRun = false;
63             m_listeningThread.join();
64         }
65
66         // only stop if we are the ones who actually called 'init'.  We are counting
67         // on the server to do the stop.
68         if (m_cfg.mode == ModeType::Client)
69         {
70             OCStop();
71         }
72     }
73
74     void InProcClientWrapper::listeningFunc()
75     {
76         while(m_threadRun)
77         {
78             OCStackResult result;
79             auto cLock = m_csdkLock.lock();
80             if (cLock)
81             {
82                 std::lock_guard<std::recursive_mutex> lock(*cLock);
83                 result = OCProcess();
84             }
85             else
86             {
87                 result = OC_STACK_ERROR;
88             }
89
90             if (result != OC_STACK_OK)
91             {
92                 // TODO: do something with result if failed?
93             }
94
95             // To minimize CPU utilization we may wish to do this with sleep
96             std::this_thread::sleep_for(std::chrono::milliseconds(10));
97         }
98     }
99
100     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101     {
102         if (clientResponse->payload == nullptr ||
103                 (
104                     clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105                     clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106                     clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
107                 )
108           )
109         {
110             //OCPayloadDestroy(clientResponse->payload);
111             return OCRepresentation();
112         }
113
114         MessageContainer oc;
115         oc.setPayload(clientResponse->payload);
116         //OCPayloadDestroy(clientResponse->payload);
117
118         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119         if (it == oc.representations().end())
120         {
121             return OCRepresentation();
122         }
123
124         // first one is considered the root, everything else is considered a child of this one.
125         OCRepresentation root = *it;
126         root.setDevAddr(clientResponse->devAddr);
127         root.setUri(clientResponse->resourceUri);
128         ++it;
129
130         std::for_each(it, oc.representations().end(),
131                 [&root](const OCRepresentation& repItr)
132                 {root.addChild(repItr);});
133         return root;
134
135     }
136
137     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138         OCClientResponse* clientResponse)
139     {
140         ClientCallbackContext::ListenContext* context =
141             static_cast<ClientCallbackContext::ListenContext*>(ctx);
142
143         if (clientResponse->result != OC_STACK_OK)
144         {
145             oclog() << "listenCallback(): failed to create resource. clientResponse: "
146                     << clientResponse->result
147                     << std::flush;
148
149             return OC_STACK_KEEP_TRANSACTION;
150         }
151
152         if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153         {
154             oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155                 << std::flush;
156             return OC_STACK_KEEP_TRANSACTION;
157         }
158
159         auto clientWrapper = context->clientWrapper.lock();
160
161         if (!clientWrapper)
162         {
163             oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164                     << std::flush;
165             return OC_STACK_KEEP_TRANSACTION;
166         }
167
168         try
169         {
170             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
171                                     reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
172             // loop to ensure valid construction of all resources
173
174             for(auto resource : container.Resources())
175             {
176                 std::thread exec(context->callback, resource);
177                 exec.detach();
178             }
179         }
180         catch (std::exception &e)
181         {
182             oclog() << "Exception in listCallback, ignoring response: "
183                     << e.what() << std::flush;
184         }
185
186
187         return OC_STACK_KEEP_TRANSACTION;
188     }
189
190     OCStackApplicationResult listenErrorCallback(void* ctx, OCDoHandle /*handle*/,
191         OCClientResponse* clientResponse)
192     {
193         if (!ctx || !clientResponse)
194         {
195             return OC_STACK_KEEP_TRANSACTION;
196         }
197
198         ClientCallbackContext::ListenErrorContext* context =
199             static_cast<ClientCallbackContext::ListenErrorContext*>(ctx);
200         if (!context)
201         {
202             return OC_STACK_KEEP_TRANSACTION;
203         }
204
205         OCStackResult result = clientResponse->result;
206         if (result == OC_STACK_OK)
207         {
208             if (!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
209             {
210                 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
211                     << std::flush;
212                 return OC_STACK_KEEP_TRANSACTION;
213             }
214
215             auto clientWrapper = context->clientWrapper.lock();
216
217             if (!clientWrapper)
218             {
219                 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
220                         << std::flush;
221                 return OC_STACK_KEEP_TRANSACTION;
222             }
223
224             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
225                                         reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
226             // loop to ensure valid construction of all resources
227             for (auto resource : container.Resources())
228             {
229                 std::thread exec(context->callback, resource);
230                 exec.detach();
231             }
232             return OC_STACK_KEEP_TRANSACTION;
233         }
234
235         std::string resourceURI = clientResponse->resourceUri;
236         std::thread exec(context->errorCallback, resourceURI, result);
237         exec.detach();
238         return OC_STACK_KEEP_TRANSACTION;
239     }
240
241     OCStackResult InProcClientWrapper::ListenForResource(
242             const std::string& serviceUrl,
243             const std::string& resourceType,
244             OCConnectivityType connectivityType,
245             FindCallback& callback, QualityOfService QoS)
246     {
247         if (!callback)
248         {
249             return OC_STACK_INVALID_PARAM;
250         }
251
252         OCStackResult result;
253         ostringstream resourceUri;
254         resourceUri << serviceUrl << resourceType;
255
256         ClientCallbackContext::ListenContext* context =
257             new ClientCallbackContext::ListenContext(callback, shared_from_this());
258         OCCallbackData cbdata;
259         cbdata.context = static_cast<void*>(context),
260         cbdata.cb      = listenCallback;
261         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
262
263         auto cLock = m_csdkLock.lock();
264         if (cLock)
265         {
266             std::lock_guard<std::recursive_mutex> lock(*cLock);
267             result = OCDoResource(nullptr, OC_REST_DISCOVER,
268                                   resourceUri.str().c_str(),
269                                   nullptr, nullptr, connectivityType,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   nullptr, 0);
273         }
274         else
275         {
276             delete context;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282     OCStackResult InProcClientWrapper::ListenErrorForResource(
283             const std::string& serviceUrl,
284             const std::string& resourceType,
285             OCConnectivityType connectivityType,
286             FindCallback& callback, FindErrorCallback& errorCallback,
287             QualityOfService QoS)
288     {
289         if (!callback)
290         {
291             return OC_STACK_INVALID_PARAM;
292         }
293
294         ostringstream resourceUri;
295         resourceUri << serviceUrl << resourceType;
296
297         ClientCallbackContext::ListenErrorContext* context =
298             new ClientCallbackContext::ListenErrorContext(callback, errorCallback,
299                                                           shared_from_this());
300         if (!context)
301         {
302             return OC_STACK_ERROR;
303         }
304
305         OCCallbackData cbdata(
306                 static_cast<void*>(context),
307                 listenErrorCallback,
308                 [](void* c){delete static_cast<ClientCallbackContext::ListenErrorContext*>(c);}
309             );
310
311         OCStackResult result;
312         auto cLock = m_csdkLock.lock();
313         if (cLock)
314         {
315             std::lock_guard<std::recursive_mutex> lock(*cLock);
316             result = OCDoResource(nullptr, OC_REST_DISCOVER,
317                                   resourceUri.str().c_str(),
318                                   nullptr, nullptr, connectivityType,
319                                   static_cast<OCQualityOfService>(QoS),
320                                   &cbdata,
321                                   nullptr, 0);
322         }
323         else
324         {
325             delete context;
326             result = OC_STACK_ERROR;
327         }
328         return result;
329     }
330 #ifdef WITH_MQ
331     OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
332                                               OCClientResponse* clientResponse)
333     {
334         ClientCallbackContext::MQTopicContext* context =
335             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
336
337         if (!clientResponse || !context)
338         {
339             return OC_STACK_DELETE_TRANSACTION;
340         }
341
342         std::string resourceURI = clientResponse->resourceUri;
343         if (clientResponse->result != OC_STACK_OK)
344         {
345             oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
346                     << clientResponse->result
347                     << std::flush;
348
349             std::thread exec(context->callback, clientResponse->result,
350                              resourceURI, nullptr);
351             exec.detach();
352
353             return OC_STACK_DELETE_TRANSACTION;
354         }
355
356         auto clientWrapper = context->clientWrapper.lock();
357         if (!clientWrapper)
358         {
359             oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
360                     << std::flush;
361             return OC_STACK_DELETE_TRANSACTION;
362         }
363
364         try
365         {
366             ListenOCContainer container(clientWrapper, clientResponse->devAddr,
367                                         (OCRepPayload *) clientResponse->payload);
368
369             // loop to ensure valid construction of all resources
370             for (auto resource : container.Resources())
371             {
372                 std::thread exec(context->callback, clientResponse->result,
373                                  resourceURI, resource);
374                 exec.detach();
375             }
376         }
377         catch (std::exception &e)
378         {
379             oclog() << "Exception in listCallback, ignoring response: "
380                     << e.what() << std::flush;
381         }
382
383         return OC_STACK_DELETE_TRANSACTION;
384     }
385
386     OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
387                                                         const std::string& resourceUri,
388                                                         const QueryParamsMap& queryParams,
389                                                         const HeaderOptions& headerOptions,
390                                                         MQTopicCallback& callback,
391                                                         QualityOfService QoS)
392     {
393         oclog() << "ListenForMQTopic()" << std::flush;
394
395         if (!callback)
396         {
397             return OC_STACK_INVALID_PARAM;
398         }
399
400         ClientCallbackContext::MQTopicContext* context =
401             new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
402         OCCallbackData cbdata;
403         cbdata.context = static_cast<void*>(context),
404         cbdata.cb      = listenMQCallback;
405         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
406
407         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
408
409         OCStackResult result = OC_STACK_ERROR;
410         auto cLock = m_csdkLock.lock();
411         if (cLock)
412         {
413             std::lock_guard<std::recursive_mutex> lock(*cLock);
414             OCHeaderOption options[MAX_HEADER_OPTIONS];
415             result = OCDoResource(
416                                   nullptr, OC_REST_GET,
417                                   uri.c_str(),
418                                   &devAddr, nullptr,
419                                   CT_DEFAULT,
420                                   static_cast<OCQualityOfService>(QoS),
421                                   &cbdata,
422                                   assembleHeaderOptions(options, headerOptions),
423                                   headerOptions.size());
424         }
425         else
426         {
427             delete context;
428         }
429
430         return result;
431     }
432 #endif
433
434     OCStackApplicationResult listenDeviceCallback(void* ctx,
435                                                   OCDoHandle /*handle*/,
436             OCClientResponse* clientResponse)
437     {
438         ClientCallbackContext::DeviceListenContext* context =
439             static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
440
441         try
442         {
443             OCRepresentation rep = parseGetSetCallback(clientResponse);
444             std::thread exec(context->callback, rep);
445             exec.detach();
446         }
447         catch(OC::OCException& e)
448         {
449             oclog() <<"Exception in listenDeviceCallback, ignoring response: "
450                 <<e.what() <<std::flush;
451         }
452
453         return OC_STACK_KEEP_TRANSACTION;
454     }
455
456     OCStackResult InProcClientWrapper::ListenForDevice(
457             const std::string& serviceUrl,
458             const std::string& deviceURI,
459             OCConnectivityType connectivityType,
460             FindDeviceCallback& callback,
461             QualityOfService QoS)
462     {
463         if (!callback)
464         {
465             return OC_STACK_INVALID_PARAM;
466         }
467         OCStackResult result;
468         ostringstream deviceUri;
469         deviceUri << serviceUrl << deviceURI;
470
471         ClientCallbackContext::DeviceListenContext* context =
472             new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
473         OCCallbackData cbdata;
474
475         cbdata.context = static_cast<void*>(context),
476         cbdata.cb      = listenDeviceCallback;
477         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
478
479         auto cLock = m_csdkLock.lock();
480         if (cLock)
481         {
482             std::lock_guard<std::recursive_mutex> lock(*cLock);
483             result = OCDoResource(nullptr, OC_REST_DISCOVER,
484                                   deviceUri.str().c_str(),
485                                   nullptr, nullptr, connectivityType,
486                                   static_cast<OCQualityOfService>(QoS),
487                                   &cbdata,
488                                   nullptr, 0);
489         }
490         else
491         {
492             delete context;
493             result = OC_STACK_ERROR;
494         }
495         return result;
496     }
497
498     void parseServerHeaderOptions(OCClientResponse* clientResponse,
499                     HeaderOptions& serverHeaderOptions)
500     {
501         if (clientResponse)
502         {
503             // Parse header options from server
504             uint16_t optionID;
505             std::string optionData;
506
507             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
508             {
509                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
510                 optionData = reinterpret_cast<const char*>
511                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
512                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
513                 serverHeaderOptions.push_back(headerOption);
514             }
515         }
516         else
517         {
518             // clientResponse is invalid
519             // TODO check proper logging
520             std::cout << " Invalid response " << std::endl;
521         }
522     }
523
524 #ifdef WITH_MQ
525     OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
526                     OCClientResponse* clientResponse)
527     {
528         ClientCallbackContext::MQTopicContext* context =
529             static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
530         HeaderOptions serverHeaderOptions;
531
532         if (!clientResponse || !context)
533         {
534             return OC_STACK_DELETE_TRANSACTION;
535         }
536
537         std::string createdUri;
538         bool isLocationOption = false;
539         OCStackResult result = clientResponse->result;
540         if (OC_STACK_OK               == result ||
541             OC_STACK_RESOURCE_CREATED == result)
542         {
543             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
544
545             for (auto headerOption : serverHeaderOptions)
546             {
547                 if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
548                 {
549                     createdUri += "/";
550                     createdUri += headerOption.getOptionData();
551                     if (!isLocationOption)
552                     {
553                         isLocationOption = true;
554                     }
555                 }
556             }
557         }
558
559         if (!isLocationOption)
560         {
561             createdUri = std::string(clientResponse->resourceUri);
562         }
563
564         auto clientWrapper = context->clientWrapper.lock();
565
566         if (!clientWrapper)
567         {
568             oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
569                     << std::flush;
570             return OC_STACK_DELETE_TRANSACTION;
571         }
572
573         try
574         {
575             if (OC_STACK_OK               == result ||
576                 OC_STACK_RESOURCE_CREATED == result)
577             {
578                 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
579                                             createdUri);
580                 for (auto resource : container.Resources())
581                 {
582                     std::thread exec(context->callback, result,
583                                      createdUri,
584                                      resource);
585                     exec.detach();
586                 }
587             }
588             else
589             {
590                 std::thread exec(context->callback, result,
591                                  createdUri,
592                                  nullptr);
593                 exec.detach();
594             }
595         }
596         catch (std::exception &e)
597         {
598             oclog() << "Exception in createMQTopicCallback, ignoring response: "
599                     << e.what() << std::flush;
600         }
601         return OC_STACK_DELETE_TRANSACTION;
602     }
603
604     OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
605                 const OCDevAddr& devAddr,
606                 const std::string& uri,
607                 const OCRepresentation& rep,
608                 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
609                 MQTopicCallback& callback, QualityOfService QoS)
610     {
611         if (!callback)
612         {
613             return OC_STACK_INVALID_PARAM;
614         }
615         OCStackResult result;
616         ClientCallbackContext::MQTopicContext* ctx =
617                 new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
618         OCCallbackData cbdata;
619         cbdata.context = static_cast<void*>(ctx),
620         cbdata.cb      = createMQTopicCallback;
621         cbdata.cd      = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
622
623         std::string url = assembleSetResourceUri(uri, queryParams);
624
625         auto cLock = m_csdkLock.lock();
626
627         if (cLock)
628         {
629             std::lock_guard<std::recursive_mutex> lock(*cLock);
630             OCHeaderOption options[MAX_HEADER_OPTIONS];
631
632             result = OCDoResource(nullptr, OC_REST_PUT,
633                                   url.c_str(), &devAddr,
634                                   assembleSetResourcePayload(rep),
635                                   CT_DEFAULT,
636                                   static_cast<OCQualityOfService>(QoS),
637                                   &cbdata,
638                                   assembleHeaderOptions(options, headerOptions),
639                                   headerOptions.size());
640         }
641         else
642         {
643             delete ctx;
644             result = OC_STACK_ERROR;
645         }
646
647         return result;
648     }
649 #endif
650     OCStackApplicationResult getResourceCallback(void* ctx,
651                                                  OCDoHandle /*handle*/,
652         OCClientResponse* clientResponse)
653     {
654         ClientCallbackContext::GetContext* context =
655             static_cast<ClientCallbackContext::GetContext*>(ctx);
656
657         OCRepresentation rep;
658         HeaderOptions serverHeaderOptions;
659         OCStackResult result = clientResponse->result;
660         if (result == OC_STACK_OK)
661         {
662             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
663             try
664             {
665                 rep = parseGetSetCallback(clientResponse);
666             }
667             catch(OC::OCException& e)
668             {
669                 result = e.code();
670             }
671         }
672
673         std::thread exec(context->callback, serverHeaderOptions, rep, result);
674         exec.detach();
675         return OC_STACK_DELETE_TRANSACTION;
676     }
677
678     OCStackResult InProcClientWrapper::GetResourceRepresentation(
679         const OCDevAddr& devAddr,
680         const std::string& resourceUri,
681         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
682         OCConnectivityType connectivityType,
683         GetCallback& callback, QualityOfService QoS)
684     {
685         if (!callback)
686         {
687             return OC_STACK_INVALID_PARAM;
688         }
689         OCStackResult result;
690         ClientCallbackContext::GetContext* ctx =
691             new ClientCallbackContext::GetContext(callback);
692         OCCallbackData cbdata;
693         cbdata.context = static_cast<void*>(ctx),
694         cbdata.cb      = getResourceCallback;
695         cbdata.cd      = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
696
697
698         std::string uri = assembleSetResourceUri(resourceUri, queryParams);
699
700         auto cLock = m_csdkLock.lock();
701
702         if (cLock)
703         {
704             std::lock_guard<std::recursive_mutex> lock(*cLock);
705             OCHeaderOption options[MAX_HEADER_OPTIONS];
706
707             result = OCDoResource(
708                                   nullptr, OC_REST_GET,
709                                   uri.c_str(),
710                                   &devAddr, nullptr,
711                                   connectivityType,
712                                   static_cast<OCQualityOfService>(QoS),
713                                   &cbdata,
714                                   assembleHeaderOptions(options, headerOptions),
715                                   headerOptions.size());
716         }
717         else
718         {
719             delete ctx;
720             result = OC_STACK_ERROR;
721         }
722         return result;
723     }
724
725
726     OCStackApplicationResult setResourceCallback(void* ctx,
727                                                  OCDoHandle /*handle*/,
728         OCClientResponse* clientResponse)
729     {
730         ClientCallbackContext::SetContext* context =
731             static_cast<ClientCallbackContext::SetContext*>(ctx);
732         OCRepresentation attrs;
733         HeaderOptions serverHeaderOptions;
734
735         OCStackResult result = clientResponse->result;
736         if (OC_STACK_OK               == result ||
737             OC_STACK_RESOURCE_CREATED == result ||
738             OC_STACK_RESOURCE_DELETED == result ||
739             OC_STACK_RESOURCE_CHANGED == result)
740         {
741             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
742             try
743             {
744                 attrs = parseGetSetCallback(clientResponse);
745             }
746             catch(OC::OCException& e)
747             {
748                 result = e.code();
749             }
750         }
751
752         std::thread exec(context->callback, serverHeaderOptions, attrs, result);
753         exec.detach();
754         return OC_STACK_DELETE_TRANSACTION;
755     }
756
757     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
758         const QueryParamsMap& queryParams)
759     {
760         if (!uri.empty())
761         {
762             if (uri.back() == '/')
763             {
764                 uri.resize(uri.size() - 1);
765             }
766         }
767
768         ostringstream paramsList;
769         if (queryParams.size() > 0)
770         {
771             paramsList << '?';
772         }
773
774         for (auto& param : queryParams)
775         {
776             paramsList << param.first <<'='<<param.second<<';';
777         }
778
779         std::string queryString = paramsList.str();
780
781         if (queryString.empty())
782         {
783             return uri;
784         }
785
786         if (queryString.back() == ';')
787         {
788             queryString.resize(queryString.size() - 1);
789         }
790
791         std::string ret = uri + queryString;
792         return ret;
793     }
794
795     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
796         const QueryParamsList& queryParams)
797     {
798         if (!uri.empty())
799         {
800             if (uri.back() == '/')
801             {
802                 uri.resize(uri.size() - 1);
803             }
804         }
805
806         ostringstream paramsList;
807         if (queryParams.size() > 0)
808         {
809             paramsList << '?';
810         }
811
812         for (auto& param : queryParams)
813         {
814             for (auto& paramList : param.second)
815             {
816                 paramsList << param.first << '=' << paramList << ';';
817             }
818         }
819
820         std::string queryString = paramsList.str();
821
822         if (queryString.empty())
823         {
824             return uri;
825         }
826
827         if (queryString.back() == ';')
828         {
829             queryString.resize(queryString.size() - 1);
830         }
831
832         std::string ret = uri + queryString;
833         return ret;
834     }
835
836     OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
837     {
838         MessageContainer ocInfo;
839         ocInfo.addRepresentation(rep);
840         for(const OCRepresentation& r : rep.getChildren())
841         {
842             ocInfo.addRepresentation(r);
843         }
844
845         return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
846     }
847
848     OCStackResult InProcClientWrapper::PostResourceRepresentation(
849         const OCDevAddr& devAddr,
850         const std::string& uri,
851         const OCRepresentation& rep,
852         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
853         OCConnectivityType connectivityType,
854         PostCallback& callback, QualityOfService QoS)
855     {
856         if (!callback)
857         {
858             return OC_STACK_INVALID_PARAM;
859         }
860         OCStackResult result;
861         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
862         OCCallbackData cbdata;
863         cbdata.context = static_cast<void*>(ctx),
864         cbdata.cb      = setResourceCallback;
865         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
866
867
868         std::string url = assembleSetResourceUri(uri, queryParams);
869
870         auto cLock = m_csdkLock.lock();
871
872         if (cLock)
873         {
874             std::lock_guard<std::recursive_mutex> lock(*cLock);
875             OCHeaderOption options[MAX_HEADER_OPTIONS];
876
877             result = OCDoResource(nullptr, OC_REST_POST,
878                                   url.c_str(), &devAddr,
879                                   assembleSetResourcePayload(rep),
880                                   connectivityType,
881                                   static_cast<OCQualityOfService>(QoS),
882                                   &cbdata,
883                                   assembleHeaderOptions(options, headerOptions),
884                                   headerOptions.size());
885         }
886         else
887         {
888             delete ctx;
889             result = OC_STACK_ERROR;
890         }
891
892         return result;
893     }
894
895     OCStackResult InProcClientWrapper::PutResourceRepresentation(
896         const OCDevAddr& devAddr,
897         const std::string& uri,
898         const OCRepresentation& rep,
899         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
900         PutCallback& callback, QualityOfService QoS)
901     {
902         if (!callback)
903         {
904             return OC_STACK_INVALID_PARAM;
905         }
906         OCStackResult result;
907         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
908         OCCallbackData cbdata;
909         cbdata.context = static_cast<void*>(ctx),
910         cbdata.cb      = setResourceCallback;
911         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
912
913
914         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
915
916         auto cLock = m_csdkLock.lock();
917
918         if (cLock)
919         {
920             std::lock_guard<std::recursive_mutex> lock(*cLock);
921             OCDoHandle handle;
922             OCHeaderOption options[MAX_HEADER_OPTIONS];
923
924             result = OCDoResource(&handle, OC_REST_PUT,
925                                   url.c_str(), &devAddr,
926                                   assembleSetResourcePayload(rep),
927                                   CT_DEFAULT,
928                                   static_cast<OCQualityOfService>(QoS),
929                                   &cbdata,
930                                   assembleHeaderOptions(options, headerOptions),
931                                   headerOptions.size());
932         }
933         else
934         {
935             delete ctx;
936             result = OC_STACK_ERROR;
937         }
938
939         return result;
940     }
941
942     OCStackApplicationResult deleteResourceCallback(void* ctx,
943                                                     OCDoHandle /*handle*/,
944         OCClientResponse* clientResponse)
945     {
946         ClientCallbackContext::DeleteContext* context =
947             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
948         HeaderOptions serverHeaderOptions;
949
950         if (clientResponse->result == OC_STACK_OK)
951         {
952             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
953         }
954         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
955         exec.detach();
956         return OC_STACK_DELETE_TRANSACTION;
957     }
958
959     OCStackResult InProcClientWrapper::DeleteResource(
960         const OCDevAddr& devAddr,
961         const std::string& uri,
962         const HeaderOptions& headerOptions,
963         OCConnectivityType connectivityType,
964         DeleteCallback& callback,
965         QualityOfService /*QoS*/)
966     {
967         if (!callback)
968         {
969             return OC_STACK_INVALID_PARAM;
970         }
971         OCStackResult result;
972         ClientCallbackContext::DeleteContext* ctx =
973             new ClientCallbackContext::DeleteContext(callback);
974         OCCallbackData cbdata;
975         cbdata.context = static_cast<void*>(ctx),
976         cbdata.cb      = deleteResourceCallback;
977         cbdata.cd      = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
978
979
980         auto cLock = m_csdkLock.lock();
981
982         if (cLock)
983         {
984             OCHeaderOption options[MAX_HEADER_OPTIONS];
985
986             std::lock_guard<std::recursive_mutex> lock(*cLock);
987
988             result = OCDoResource(nullptr, OC_REST_DELETE,
989                                   uri.c_str(), &devAddr,
990                                   nullptr,
991                                   connectivityType,
992                                   static_cast<OCQualityOfService>(m_cfg.QoS),
993                                   &cbdata,
994                                   assembleHeaderOptions(options, headerOptions),
995                                   headerOptions.size());
996         }
997         else
998         {
999             delete ctx;
1000             result = OC_STACK_ERROR;
1001         }
1002
1003         return result;
1004     }
1005
1006     OCStackApplicationResult observeResourceCallback(void* ctx,
1007                                                      OCDoHandle /*handle*/,
1008         OCClientResponse* clientResponse)
1009     {
1010         ClientCallbackContext::ObserveContext* context =
1011             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
1012         OCRepresentation attrs;
1013         HeaderOptions serverHeaderOptions;
1014         uint32_t sequenceNumber = clientResponse->sequenceNumber;
1015         OCStackResult result = clientResponse->result;
1016         if (clientResponse->result == OC_STACK_OK)
1017         {
1018             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
1019             try
1020             {
1021                 attrs = parseGetSetCallback(clientResponse);
1022             }
1023             catch(OC::OCException& e)
1024             {
1025                 result = e.code();
1026             }
1027         }
1028         std::thread exec(context->callback, serverHeaderOptions, attrs,
1029                     result, sequenceNumber);
1030         exec.detach();
1031         if (sequenceNumber == MAX_SEQUENCE_NUMBER + 1)
1032         {
1033             return OC_STACK_DELETE_TRANSACTION;
1034         }
1035
1036         return OC_STACK_KEEP_TRANSACTION;
1037     }
1038
1039     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
1040         const OCDevAddr& devAddr,
1041         const std::string& uri,
1042         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
1043         ObserveCallback& callback, QualityOfService QoS)
1044     {
1045         if (!callback)
1046         {
1047             return OC_STACK_INVALID_PARAM;
1048         }
1049         OCStackResult result;
1050
1051         ClientCallbackContext::ObserveContext* ctx =
1052             new ClientCallbackContext::ObserveContext(callback);
1053         OCCallbackData cbdata;
1054         cbdata.context = static_cast<void*>(ctx),
1055         cbdata.cb      = observeResourceCallback;
1056         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1057
1058
1059         OCMethod method;
1060         if (observeType == ObserveType::Observe)
1061         {
1062             method = OC_REST_OBSERVE;
1063         }
1064         else if (observeType == ObserveType::ObserveAll)
1065         {
1066             method = OC_REST_OBSERVE_ALL;
1067         }
1068         else
1069         {
1070             method = OC_REST_OBSERVE_ALL;
1071         }
1072
1073         std::string url = assembleSetResourceUri(uri, queryParams).c_str();
1074
1075         auto cLock = m_csdkLock.lock();
1076
1077         if (cLock)
1078         {
1079             std::lock_guard<std::recursive_mutex> lock(*cLock);
1080             OCHeaderOption options[MAX_HEADER_OPTIONS];
1081
1082             result = OCDoResource(handle, method,
1083                                   url.c_str(), &devAddr,
1084                                   nullptr,
1085                                   CT_DEFAULT,
1086                                   static_cast<OCQualityOfService>(QoS),
1087                                   &cbdata,
1088                                   assembleHeaderOptions(options, headerOptions),
1089                                   headerOptions.size());
1090         }
1091         else
1092         {
1093             delete ctx;
1094             return OC_STACK_ERROR;
1095         }
1096
1097         return result;
1098     }
1099
1100     OCStackResult InProcClientWrapper::CancelObserveResource(
1101             OCDoHandle handle,
1102             const std::string& /*host*/,
1103             const std::string& /*uri*/,
1104             const HeaderOptions& headerOptions,
1105             QualityOfService QoS)
1106     {
1107         OCStackResult result;
1108         auto cLock = m_csdkLock.lock();
1109
1110         if (cLock)
1111         {
1112             std::lock_guard<std::recursive_mutex> lock(*cLock);
1113             OCHeaderOption options[MAX_HEADER_OPTIONS];
1114
1115             result = OCCancel(handle,
1116                     static_cast<OCQualityOfService>(QoS),
1117                     assembleHeaderOptions(options, headerOptions),
1118                     headerOptions.size());
1119         }
1120         else
1121         {
1122             result = OC_STACK_ERROR;
1123         }
1124
1125         return result;
1126     }
1127
1128     OCStackApplicationResult subscribePresenceCallback(void* ctx,
1129                                                        OCDoHandle /*handle*/,
1130             OCClientResponse* clientResponse)
1131     {
1132         ClientCallbackContext::SubscribePresenceContext* context =
1133         static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
1134
1135         /*
1136          * This a hack while we rethink presence subscription.
1137          */
1138         std::string url = clientResponse->devAddr.addr;
1139
1140         std::thread exec(context->callback, clientResponse->result,
1141                     clientResponse->sequenceNumber, url);
1142
1143         exec.detach();
1144
1145         return OC_STACK_KEEP_TRANSACTION;
1146     }
1147
1148     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
1149         const std::string& host, const std::string& resourceType,
1150         OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
1151     {
1152         if (!presenceHandler)
1153         {
1154             return OC_STACK_INVALID_PARAM;
1155         }
1156
1157         ClientCallbackContext::SubscribePresenceContext* ctx =
1158             new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
1159         OCCallbackData cbdata;
1160         cbdata.context = static_cast<void*>(ctx),
1161         cbdata.cb      = subscribePresenceCallback;
1162         cbdata.cd      = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
1163
1164
1165         auto cLock = m_csdkLock.lock();
1166
1167         std::ostringstream os;
1168         os << host << OC_RSRVD_PRESENCE_URI;
1169
1170         if (!resourceType.empty())
1171         {
1172             os << "?rt=" << resourceType;
1173         }
1174
1175         if (!cLock)
1176         {
1177             delete ctx;
1178             return OC_STACK_ERROR;
1179         }
1180
1181         return OCDoResource(handle, OC_REST_PRESENCE,
1182                             os.str().c_str(), nullptr,
1183                             nullptr, connectivityType,
1184                             OC_LOW_QOS, &cbdata, NULL, 0);
1185     }
1186
1187     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
1188     {
1189         OCStackResult result;
1190         auto cLock = m_csdkLock.lock();
1191
1192         if (cLock)
1193         {
1194             std::lock_guard<std::recursive_mutex> lock(*cLock);
1195             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
1196         }
1197         else
1198         {
1199             result = OC_STACK_ERROR;
1200         }
1201
1202         return result;
1203     }
1204
1205 #ifdef WITH_CLOUD
1206     OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
1207                                                                const std::string& host,
1208                                                                const std::vector<std::string>& di,
1209                                                                OCConnectivityType connectivityType,
1210                                                                ObserveCallback& callback)
1211     {
1212         if (!callback)
1213         {
1214             return OC_STACK_INVALID_PARAM;
1215         }
1216         OCStackResult result;
1217
1218         ClientCallbackContext::ObserveContext* ctx =
1219             new ClientCallbackContext::ObserveContext(callback);
1220         OCCallbackData cbdata;
1221         cbdata.context = static_cast<void*>(ctx),
1222         cbdata.cb      = observeResourceCallback;
1223         cbdata.cd      = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
1224
1225         auto cLock = m_csdkLock.lock();
1226
1227         if (cLock)
1228         {
1229             std::lock_guard<std::recursive_mutex> lock(*cLock);
1230
1231             std::ostringstream os;
1232             os << host << OC_RSRVD_DEVICE_PRESENCE_URI;
1233             QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
1234             std::string url = assembleSetResourceUri(os.str(), queryParams);
1235
1236             result = OCDoResource(handle, OC_REST_OBSERVE,
1237                                   url.c_str(), nullptr,
1238                                   nullptr, connectivityType,
1239                                   OC_LOW_QOS, &cbdata,
1240                                   nullptr, 0);
1241         }
1242         else
1243         {
1244             delete ctx;
1245             result = OC_STACK_ERROR;
1246         }
1247
1248         return result;
1249     }
1250 #endif
1251
1252     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
1253     {
1254         qos = m_cfg.QoS;
1255         return OC_STACK_OK;
1256     }
1257
1258     OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
1259            const HeaderOptions& headerOptions)
1260     {
1261         int i = 0;
1262
1263         if ( headerOptions.size() == 0)
1264         {
1265             return nullptr;
1266         }
1267
1268         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
1269         {
1270             options[i] = OCHeaderOption();
1271             options[i].protocolID = OC_COAP_ID;
1272             options[i].optionID = it->getOptionID();
1273             options[i].optionLength = it->getOptionData().length() + 1;
1274             strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));
1275             i++;
1276         }
1277
1278         return options;
1279     }
1280
1281     std::shared_ptr<OCDirectPairing> cloneDevice(const OCDPDev_t* dev)
1282     {
1283         if (!dev)
1284         {
1285             return nullptr;
1286         }
1287
1288         OCDPDev_t* result = new OCDPDev_t(*dev);
1289         result->prm = new OCPrm_t[dev->prmLen];
1290         memcpy(result->prm, dev->prm, sizeof(OCPrm_t)*dev->prmLen);
1291         return std::shared_ptr<OCDirectPairing>(new OCDirectPairing(result));
1292     }
1293
1294     void InProcClientWrapper::convert(const OCDPDev_t *list, PairedDevices& dpList)
1295     {
1296         while(list)
1297         {
1298             dpList.push_back(cloneDevice(list));
1299             list = list->next;
1300         }
1301     }
1302
1303     OCStackResult InProcClientWrapper::FindDirectPairingDevices(unsigned short waittime,
1304             GetDirectPairedCallback& callback)
1305     {
1306         if (!callback || 0 == waittime)
1307         {
1308             return OC_STACK_INVALID_PARAM;
1309         }
1310
1311         OCStackResult result = OC_STACK_ERROR;
1312         const OCDPDev_t *list = nullptr;
1313         PairedDevices dpDeviceList;
1314
1315         auto cLock = m_csdkLock.lock();
1316
1317         if (cLock)
1318         {
1319             std::lock_guard<std::recursive_mutex> lock(*cLock);
1320
1321             list = OCDiscoverDirectPairingDevices(waittime);
1322             if (NULL == list)
1323             {
1324                 result = OC_STACK_NO_RESOURCE;
1325                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1326                     << std::flush;
1327             }
1328             else {
1329                 convert(list, dpDeviceList);
1330                 std::thread exec(callback, dpDeviceList);
1331                 exec.detach();
1332                 result = OC_STACK_OK;
1333             }
1334         }
1335         else
1336         {
1337             result = OC_STACK_ERROR;
1338         }
1339
1340         return result;
1341     }
1342
1343     OCStackResult InProcClientWrapper::GetDirectPairedDevices(GetDirectPairedCallback& callback)
1344     {
1345         if (!callback)
1346         {
1347             return OC_STACK_INVALID_PARAM;
1348         }
1349
1350         OCStackResult result = OC_STACK_ERROR;
1351         const OCDPDev_t *list = nullptr;
1352         PairedDevices dpDeviceList;
1353
1354         auto cLock = m_csdkLock.lock();
1355
1356         if (cLock)
1357         {
1358             std::lock_guard<std::recursive_mutex> lock(*cLock);
1359
1360             list = OCGetDirectPairedDevices();
1361             if (NULL == list)
1362             {
1363                 result = OC_STACK_NO_RESOURCE;
1364                 oclog() << "findDirectPairingDevices(): No device found for direct pairing"
1365                     << std::flush;
1366             }
1367             else {
1368                 convert(list, dpDeviceList);
1369                 std::thread exec(callback, dpDeviceList);
1370                 exec.detach();
1371                 result = OC_STACK_OK;
1372             }
1373         }
1374         else
1375         {
1376             result = OC_STACK_ERROR;
1377         }
1378
1379         return result;
1380     }
1381
1382     void directPairingCallback(void *ctx, OCDPDev_t *peer,
1383             OCStackResult result)
1384     {
1385
1386         ClientCallbackContext::DirectPairingContext* context =
1387             static_cast<ClientCallbackContext::DirectPairingContext*>(ctx);
1388
1389         std::thread exec(context->callback, cloneDevice(peer), result);
1390         exec.detach();
1391     }
1392
1393     OCStackResult InProcClientWrapper::DoDirectPairing(std::shared_ptr<OCDirectPairing> peer,
1394             const OCPrm_t& pmSel, const std::string& pinNumber, DirectPairingCallback& callback)
1395     {
1396         if (!peer || !callback)
1397         {
1398             oclog() << "Invalid parameters" << std::flush;
1399             return OC_STACK_INVALID_PARAM;
1400         }
1401
1402         OCStackResult result = OC_STACK_ERROR;
1403         ClientCallbackContext::DirectPairingContext* context =
1404             new ClientCallbackContext::DirectPairingContext(callback);
1405
1406         auto cLock = m_csdkLock.lock();
1407         if (cLock)
1408         {
1409             std::lock_guard<std::recursive_mutex> lock(*cLock);
1410             result = OCDoDirectPairing(static_cast<void*>(context), peer->getDev(),
1411                     pmSel, const_cast<char*>(pinNumber.c_str()), directPairingCallback);
1412         }
1413         else
1414         {
1415             delete context;
1416             result = OC_STACK_ERROR;
1417         }
1418         return result;
1419     }
1420 }