Merge changes I56f29eac,Ic7530d44
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         OCStop();
62     }
63
64     void InProcClientWrapper::listeningFunc()
65     {
66         while(m_threadRun)
67         {
68             OCStackResult result;
69             auto cLock = m_csdkLock.lock();
70             if(cLock)
71             {
72                 std::lock_guard<std::recursive_mutex> lock(*cLock);
73                 result = OCProcess();
74             }
75             else
76             {
77                 result = OC_STACK_ERROR;
78             }
79
80             if(result != OC_STACK_OK)
81             {
82                 // TODO: do something with result if failed?
83             }
84
85             // To minimize CPU utilization we may wish to do this with sleep
86             std::this_thread::sleep_for(std::chrono::milliseconds(10));
87         }
88     }
89
90     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
91         OCClientResponse* clientResponse)
92     {
93         ClientCallbackContext::ListenContext* context =
94             static_cast<ClientCallbackContext::ListenContext*>(ctx);
95
96         if(clientResponse->result != OC_STACK_OK)
97         {
98             oclog() << "listenCallback(): failed to create resource. clientResponse: "
99                     << clientResponse->result
100                     << std::flush;
101
102             return OC_STACK_KEEP_TRANSACTION;
103         }
104
105         std::stringstream requestStream;
106         requestStream << clientResponse->resJSONPayload;
107
108         try
109         {
110             ListenOCContainer container(context->clientWrapper, *clientResponse->addr,
111                     requestStream);
112
113             // loop to ensure valid construction of all resources
114             for(auto resource : container.Resources())
115             {
116                 std::thread exec(context->callback, resource);
117                 exec.detach();
118             }
119
120         }
121         catch(const std::exception& e)
122         {
123             oclog() << "listenCallback failed to parse a malformed message: "
124                     << e.what()
125                     << std::endl <<std::endl
126                     << clientResponse->result
127                     << std::flush;
128             return OC_STACK_KEEP_TRANSACTION;
129         }
130
131         return OC_STACK_KEEP_TRANSACTION;
132
133     }
134
135     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
136         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
137     {
138         OCStackResult result;
139
140         OCCallbackData cbdata = {0};
141
142         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
143         context->callback = callback;
144         context->clientWrapper = shared_from_this();
145
146         cbdata.context =  static_cast<void*>(context);
147         cbdata.cb = listenCallback;
148         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
149
150         auto cLock = m_csdkLock.lock();
151         if(cLock)
152         {
153             std::lock_guard<std::recursive_mutex> lock(*cLock);
154             OCDoHandle handle;
155             result = OCDoResource(&handle, OC_REST_GET,
156                                   resourceType.c_str(),
157                                   nullptr, nullptr,
158                                   static_cast<OCQualityOfService>(QoS),
159                                   &cbdata,
160                                   NULL, 0);
161         }
162         else
163         {
164             delete context;
165             result = OC_STACK_ERROR;
166         }
167         return result;
168     }
169
170     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
171     {
172         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
173         {
174             return OCRepresentation();
175         }
176
177         MessageContainer oc;
178         oc.setJSONRepresentation(clientResponse->resJSONPayload);
179
180         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
181         if(it == oc.representations().end())
182         {
183             return OCRepresentation();
184         }
185
186         // first one is considered the root, everything else is considered a child of this one.
187         OCRepresentation root = *it;
188         ++it;
189
190         std::for_each(it, oc.representations().end(),
191                 [&root](const OCRepresentation& repItr)
192                 {root.addChild(repItr);});
193         return root;
194
195     }
196
197     void parseServerHeaderOptions(OCClientResponse* clientResponse,
198                     HeaderOptions& serverHeaderOptions)
199     {
200         if(clientResponse)
201         {
202             // Parse header options from server
203             uint16_t optionID;
204             std::string optionData;
205
206             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
207             {
208                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
209                 optionData = reinterpret_cast<const char*>
210                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
211                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
212                 serverHeaderOptions.push_back(headerOption);
213             }
214         }
215         else
216         {
217             // clientResponse is invalid
218             // TODO check proper logging
219             std::cout << " Invalid response " << std::endl;
220         }
221     }
222
223     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
224         OCClientResponse* clientResponse)
225     {
226         ClientCallbackContext::GetContext* context =
227             static_cast<ClientCallbackContext::GetContext*>(ctx);
228
229         OCRepresentation rep;
230         HeaderOptions serverHeaderOptions;
231         if(clientResponse->result == OC_STACK_OK)
232         {
233             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
234             rep = parseGetSetCallback(clientResponse);
235         }
236
237         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
238         exec.detach();
239         return OC_STACK_DELETE_TRANSACTION;
240     }
241
242     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
243         const std::string& uri, const QueryParamsMap& queryParams,
244         const HeaderOptions& headerOptions, GetCallback& callback,
245         QualityOfService QoS)
246     {
247         OCStackResult result;
248         OCCallbackData cbdata = {0};
249
250         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
251         ctx->callback = callback;
252         cbdata.context = static_cast<void*>(ctx);
253         cbdata.cb = &getResourceCallback;
254         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
255
256         auto cLock = m_csdkLock.lock();
257
258         if(cLock)
259         {
260             std::ostringstream os;
261             os << host << assembleSetResourceUri(uri, queryParams).c_str();
262
263             std::lock_guard<std::recursive_mutex> lock(*cLock);
264             OCDoHandle handle;
265             OCHeaderOption options[MAX_HEADER_OPTIONS];
266
267             assembleHeaderOptions(options, headerOptions);
268             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
269                                   nullptr, nullptr,
270                                   static_cast<OCQualityOfService>(QoS),
271                                   &cbdata,
272                                   options, headerOptions.size());
273         }
274         else
275         {
276             delete ctx;
277             result = OC_STACK_ERROR;
278         }
279         return result;
280     }
281
282
283     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
284         OCClientResponse* clientResponse)
285     {
286         ClientCallbackContext::SetContext* context =
287             static_cast<ClientCallbackContext::SetContext*>(ctx);
288         OCRepresentation attrs;
289         HeaderOptions serverHeaderOptions;
290
291         if (OC_STACK_OK               == clientResponse->result ||
292             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
293             OC_STACK_RESOURCE_DELETED == clientResponse->result)
294         {
295             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
296             attrs = parseGetSetCallback(clientResponse);
297         }
298
299         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
300         exec.detach();
301         return OC_STACK_DELETE_TRANSACTION;
302     }
303
304     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
305         const QueryParamsMap& queryParams)
306     {
307         if(uri.back() == '/')
308         {
309             uri.resize(uri.size()-1);
310         }
311
312         ostringstream paramsList;
313         if(queryParams.size() > 0)
314         {
315             paramsList << '?';
316         }
317
318         for(auto& param : queryParams)
319         {
320             paramsList << param.first <<'='<<param.second<<'&';
321         }
322
323         std::string queryString = paramsList.str();
324         if(queryString.back() == '&')
325         {
326             queryString.resize(queryString.size() - 1);
327         }
328
329         std::string ret = uri + queryString;
330         return ret;
331     }
332
333     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
334     {
335         MessageContainer ocInfo;
336         ocInfo.addRepresentation(rep);
337         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
338     }
339
340     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
341         const std::string& uri, const OCRepresentation& rep,
342         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
343         PostCallback& callback, QualityOfService QoS)
344     {
345         OCStackResult result;
346         OCCallbackData cbdata = {0};
347
348         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
349         ctx->callback = callback;
350         cbdata.cb = &setResourceCallback;
351         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
352         cbdata.context = static_cast<void*>(ctx);
353
354         // TODO: in the future the cstack should be combining these two strings!
355         ostringstream os;
356         os << host << assembleSetResourceUri(uri, queryParams).c_str();
357         // TODO: end of above
358
359         auto cLock = m_csdkLock.lock();
360
361         if(cLock)
362         {
363             std::lock_guard<std::recursive_mutex> lock(*cLock);
364             OCHeaderOption options[MAX_HEADER_OPTIONS];
365             OCDoHandle handle;
366
367             assembleHeaderOptions(options, headerOptions);
368             result = OCDoResource(&handle, OC_REST_POST,
369                                   os.str().c_str(), nullptr,
370                                   assembleSetResourcePayload(rep).c_str(),
371                                   static_cast<OCQualityOfService>(QoS),
372                                   &cbdata, options, headerOptions.size());
373         }
374         else
375         {
376             delete ctx;
377             result = OC_STACK_ERROR;
378         }
379
380         return result;
381     }
382
383
384     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
385         const std::string& uri, const OCRepresentation& rep,
386         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
387         PutCallback& callback, QualityOfService QoS)
388     {
389         OCStackResult result;
390         OCCallbackData cbdata = {0};
391
392         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
393         ctx->callback = callback;
394         cbdata.cb = &setResourceCallback;
395         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
396         cbdata.context = static_cast<void*>(ctx);
397
398         // TODO: in the future the cstack should be combining these two strings!
399         ostringstream os;
400         os << host << assembleSetResourceUri(uri, queryParams).c_str();
401         // TODO: end of above
402
403         auto cLock = m_csdkLock.lock();
404
405         if(cLock)
406         {
407             std::lock_guard<std::recursive_mutex> lock(*cLock);
408             OCDoHandle handle;
409             OCHeaderOption options[MAX_HEADER_OPTIONS];
410
411             assembleHeaderOptions(options, headerOptions);
412             result = OCDoResource(&handle, OC_REST_PUT,
413                                   os.str().c_str(), nullptr,
414                                   assembleSetResourcePayload(rep).c_str(),
415                                   static_cast<OCQualityOfService>(QoS),
416                                   &cbdata,
417                                   options, headerOptions.size());
418         }
419         else
420         {
421             delete ctx;
422             result = OC_STACK_ERROR;
423         }
424
425         return result;
426     }
427
428     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
429         OCClientResponse* clientResponse)
430     {
431         ClientCallbackContext::DeleteContext* context =
432             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
433         HeaderOptions serverHeaderOptions;
434
435         if(clientResponse->result == OC_STACK_OK)
436         {
437             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
438         }
439         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
440         exec.detach();
441         return OC_STACK_DELETE_TRANSACTION;
442     }
443
444     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
445         const std::string& uri, const HeaderOptions& headerOptions,
446          DeleteCallback& callback, QualityOfService QoS)
447     {
448         OCStackResult result;
449         OCCallbackData cbdata = {0};
450
451         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
452         ctx->callback = callback;
453         cbdata.cb = &deleteResourceCallback;
454         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
455         cbdata.context = static_cast<void*>(ctx);
456
457         ostringstream os;
458         os << host << uri;
459
460         auto cLock = m_csdkLock.lock();
461
462         if(cLock)
463         {
464             OCHeaderOption options[MAX_HEADER_OPTIONS];
465             OCDoHandle handle;
466
467             assembleHeaderOptions(options, headerOptions);
468
469             std::lock_guard<std::recursive_mutex> lock(*cLock);
470
471             result = OCDoResource(&handle, OC_REST_DELETE,
472                                   os.str().c_str(), nullptr,
473                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
474                                   &cbdata, options, headerOptions.size());
475         }
476         else
477         {
478             delete ctx;
479             result = OC_STACK_ERROR;
480         }
481
482         return result;
483     }
484
485     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
486         OCClientResponse* clientResponse)
487     {
488         ClientCallbackContext::ObserveContext* context =
489             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
490         OCRepresentation attrs;
491         HeaderOptions serverHeaderOptions;
492         uint32_t sequenceNumber = clientResponse->sequenceNumber;
493
494         if(clientResponse->result == OC_STACK_OK)
495         {
496             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
497             attrs = parseGetSetCallback(clientResponse);
498         }
499         std::thread exec(context->callback, serverHeaderOptions, attrs,
500                     clientResponse->result, sequenceNumber);
501         exec.detach();
502         return OC_STACK_KEEP_TRANSACTION;
503     }
504
505     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
506         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
507         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
508     {
509         OCStackResult result;
510         OCCallbackData cbdata = {0};
511
512         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
513         ctx->callback = callback;
514         cbdata.context = static_cast<void*>(ctx);
515         cbdata.cb = &observeResourceCallback;
516         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
517
518         OCMethod method;
519         if (observeType == ObserveType::Observe)
520         {
521             method = OC_REST_OBSERVE;
522         }
523         else if (observeType == ObserveType::ObserveAll)
524         {
525             method = OC_REST_OBSERVE_ALL;
526         }
527         else
528         {
529             method = OC_REST_OBSERVE_ALL;
530         }
531
532         auto cLock = m_csdkLock.lock();
533
534         if(cLock)
535         {
536             std::ostringstream os;
537             os << host << assembleSetResourceUri(uri, queryParams).c_str();
538
539             std::lock_guard<std::recursive_mutex> lock(*cLock);
540             OCHeaderOption options[MAX_HEADER_OPTIONS];
541
542             assembleHeaderOptions(options, headerOptions);
543             result = OCDoResource(handle, method,
544                                   os.str().c_str(), nullptr,
545                                   nullptr,
546                                   static_cast<OCQualityOfService>(QoS),
547                                   &cbdata,
548                                   options, headerOptions.size());
549         }
550         else
551         {
552             delete ctx;
553             return OC_STACK_ERROR;
554         }
555
556         return result;
557     }
558
559     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
560         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
561         QualityOfService QoS)
562     {
563         OCStackResult result;
564         auto cLock = m_csdkLock.lock();
565
566         if(cLock)
567         {
568             std::lock_guard<std::recursive_mutex> lock(*cLock);
569             OCHeaderOption options[MAX_HEADER_OPTIONS];
570
571             assembleHeaderOptions(options, headerOptions);
572             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
573                     headerOptions.size());
574         }
575         else
576         {
577             result = OC_STACK_ERROR;
578         }
579
580         return result;
581     }
582
583     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
584             OCClientResponse* clientResponse)
585     {
586         char stringAddress[DEV_ADDR_SIZE_MAX];
587         ostringstream os;
588         uint16_t port;
589
590         if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
591                 OCDevAddrToPort(clientResponse->addr, &port) == 0)
592         {
593             os<<stringAddress<<":"<<port;
594
595             ClientCallbackContext::SubscribePresenceContext* context =
596                 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
597
598             std::thread exec(context->callback, clientResponse->result,
599                     clientResponse->sequenceNumber, os.str());
600
601             exec.detach();
602         }
603         else
604         {
605             oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
606                     <<"failed"<< std::flush;
607         }
608         return OC_STACK_KEEP_TRANSACTION;
609     }
610
611     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
612         const std::string& host, const std::string& resourceType,
613         SubscribeCallback& presenceHandler)
614     {
615         OCCallbackData cbdata = {0};
616
617         ClientCallbackContext::SubscribePresenceContext* ctx =
618             new ClientCallbackContext::SubscribePresenceContext();
619         ctx->callback = presenceHandler;
620         cbdata.cb = &subscribePresenceCallback;
621         cbdata.context = static_cast<void*>(ctx);
622         cbdata.cd = [](void* c)
623             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
624         auto cLock = m_csdkLock.lock();
625
626         std::ostringstream os;
627         os << host << "/oc/presence";
628
629         if(!resourceType.empty())
630         {
631             os << "?rt=" << resourceType;
632         }
633
634         if(!cLock)
635         {
636             delete ctx;
637             return OC_STACK_ERROR;
638         }
639
640         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
641                             OC_LOW_QOS, &cbdata, NULL, 0);
642     }
643
644     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
645     {
646         OCStackResult result;
647         auto cLock = m_csdkLock.lock();
648
649         if(cLock)
650         {
651             std::lock_guard<std::recursive_mutex> lock(*cLock);
652             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
653         }
654         else
655         {
656             result = OC_STACK_ERROR;
657         }
658
659         return result;
660     }
661
662     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
663     {
664         qos = m_cfg.QoS;
665         return OC_STACK_OK;
666     }
667
668     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
669            const HeaderOptions& headerOptions)
670     {
671         int i = 0;
672
673         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
674         {
675             options[i].protocolID = OC_COAP_ID;
676             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
677             options[i].optionLength = (it->getOptionData()).length() + 1;
678             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
679                     (it->getOptionData()).length() + 1);
680             i++;
681         }
682     }
683 }