Merge branch 'master' into connectivity-abstraction
[platform/upstream/iotivity.git] / resource / src / InProcClientWrapper.cpp
1 //******************************************************************
2 //
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "InProcClientWrapper.h"
22 #include "ocstack.h"
23
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
27 using namespace std;
28
29 namespace OC
30 {
31     InProcClientWrapper::InProcClientWrapper(
32         std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33             : m_threadRun(false), m_csdkLock(csdkLock),
34               m_cfg { cfg }
35     {
36         // if the config type is server, we ought to never get called.  If the config type
37         // is both, we count on the server to run the thread and do the initialize
38
39         if(m_cfg.mode == ModeType::Client)
40         {
41             OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
42
43             if(OC_STACK_OK != result)
44             {
45                 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
46             }
47
48             m_threadRun = true;
49             m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
50         }
51     }
52
53     InProcClientWrapper::~InProcClientWrapper()
54     {
55         if(m_threadRun && m_listeningThread.joinable())
56         {
57             m_threadRun = false;
58             m_listeningThread.join();
59         }
60
61         OCStop();
62     }
63
64     void InProcClientWrapper::listeningFunc()
65     {
66         while(m_threadRun)
67         {
68             OCStackResult result;
69             auto cLock = m_csdkLock.lock();
70             if(cLock)
71             {
72                 std::lock_guard<std::recursive_mutex> lock(*cLock);
73                 result = OCProcess();
74             }
75             else
76             {
77                 result = OC_STACK_ERROR;
78             }
79
80             if(result != OC_STACK_OK)
81             {
82                 // TODO: do something with result if failed?
83             }
84
85             // To minimize CPU utilization we may wish to do this with sleep
86             std::this_thread::sleep_for(std::chrono::milliseconds(10));
87         }
88     }
89
90     OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
91         OCClientResponse* clientResponse)
92     {
93         ClientCallbackContext::ListenContext* context =
94             static_cast<ClientCallbackContext::ListenContext*>(ctx);
95
96         if(clientResponse->result != OC_STACK_OK)
97         {
98             oclog() << "listenCallback(): failed to create resource. clientResponse: "
99                     << clientResponse->result
100                     << std::flush;
101
102             return OC_STACK_KEEP_TRANSACTION;
103         }
104
105         std::stringstream requestStream;
106         requestStream << clientResponse->resJSONPayload;
107
108         try
109         {
110             ListenOCContainer container(context->clientWrapper, *clientResponse->addr,
111                     requestStream);
112
113             // loop to ensure valid construction of all resources
114             for(auto resource : container.Resources())
115             {
116                 std::thread exec(context->callback, resource);
117                 exec.detach();
118             }
119
120         }
121         catch(const std::exception& e)
122         {
123             oclog() << "listenCallback failed to parse a malformed message: "
124                     << e.what()
125                     << std::endl <<std::endl
126                     << clientResponse->result
127                     << std::flush;
128             return OC_STACK_KEEP_TRANSACTION;
129         }
130
131         return OC_STACK_KEEP_TRANSACTION;
132
133     }
134
135     OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
136         const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
137     {
138         OCStackResult result;
139
140         OCCallbackData cbdata = {0};
141
142         ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
143         context->callback = callback;
144         context->clientWrapper = shared_from_this();
145
146         cbdata.context =  static_cast<void*>(context);
147         cbdata.cb = listenCallback;
148         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
149
150         auto cLock = m_csdkLock.lock();
151         if(cLock)
152         {
153             std::lock_guard<std::recursive_mutex> lock(*cLock);
154             OCDoHandle handle;
155             result = OCDoResource(&handle, OC_REST_GET,
156                                   resourceType.c_str(),
157                                   nullptr, nullptr,
158                                   static_cast<OCQualityOfService>(QoS),
159                                   &cbdata,
160                                   NULL, 0);
161         }
162         else
163         {
164             result = OC_STACK_ERROR;
165         }
166         return result;
167     }
168
169     OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
170     {
171         if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
172         {
173             return OCRepresentation();
174         }
175
176         MessageContainer oc;
177         oc.setJSONRepresentation(clientResponse->resJSONPayload);
178
179         std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
180         if(it == oc.representations().end())
181         {
182             return OCRepresentation();
183         }
184
185         // first one is considered the root, everything else is considered a child of this one.
186         OCRepresentation root = *it;
187         ++it;
188
189         std::for_each(it, oc.representations().end(),
190                 [&root](const OCRepresentation& repItr)
191                 {root.addChild(repItr);});
192         return root;
193
194     }
195
196     void parseServerHeaderOptions(OCClientResponse* clientResponse,
197                     HeaderOptions& serverHeaderOptions)
198     {
199         if(clientResponse)
200         {
201             // Parse header options from server
202             uint16_t optionID;
203             std::string optionData;
204
205             for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
206             {
207                 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
208                 optionData = reinterpret_cast<const char*>
209                                 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
210                 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
211                 serverHeaderOptions.push_back(headerOption);
212             }
213         }
214         else
215         {
216             // clientResponse is invalid
217             // TODO check proper logging
218             std::cout << " Invalid response " << std::endl;
219         }
220     }
221
222     OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
223         OCClientResponse* clientResponse)
224     {
225         ClientCallbackContext::GetContext* context =
226             static_cast<ClientCallbackContext::GetContext*>(ctx);
227
228         OCRepresentation rep;
229         HeaderOptions serverHeaderOptions;
230         if(clientResponse->result == OC_STACK_OK)
231         {
232             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
233             rep = parseGetSetCallback(clientResponse);
234         }
235
236         std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
237         exec.detach();
238         return OC_STACK_DELETE_TRANSACTION;
239     }
240
241     OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
242         const std::string& uri, const QueryParamsMap& queryParams,
243         const HeaderOptions& headerOptions, GetCallback& callback,
244         QualityOfService QoS)
245     {
246         OCStackResult result;
247         OCCallbackData cbdata = {0};
248
249         ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
250         ctx->callback = callback;
251         cbdata.context = static_cast<void*>(ctx);
252         cbdata.cb = &getResourceCallback;
253         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
254
255         auto cLock = m_csdkLock.lock();
256
257         if(cLock)
258         {
259             std::ostringstream os;
260             os << host << assembleSetResourceUri(uri, queryParams).c_str();
261
262             std::lock_guard<std::recursive_mutex> lock(*cLock);
263             OCDoHandle handle;
264             OCHeaderOption options[MAX_HEADER_OPTIONS];
265
266             assembleHeaderOptions(options, headerOptions);
267             result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
268                                   nullptr, nullptr,
269                                   static_cast<OCQualityOfService>(QoS),
270                                   &cbdata,
271                                   options, headerOptions.size());
272         }
273         else
274         {
275             result = OC_STACK_ERROR;
276         }
277         return result;
278     }
279
280
281     OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
282         OCClientResponse* clientResponse)
283     {
284         ClientCallbackContext::SetContext* context =
285             static_cast<ClientCallbackContext::SetContext*>(ctx);
286         OCRepresentation attrs;
287         HeaderOptions serverHeaderOptions;
288
289         if (OC_STACK_OK               == clientResponse->result ||
290             OC_STACK_RESOURCE_CREATED == clientResponse->result ||
291             OC_STACK_RESOURCE_DELETED == clientResponse->result)
292         {
293             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
294             attrs = parseGetSetCallback(clientResponse);
295         }
296
297         std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
298         exec.detach();
299         return OC_STACK_DELETE_TRANSACTION;
300     }
301
302     std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
303         const QueryParamsMap& queryParams)
304     {
305         if(uri.back() == '/')
306         {
307             uri.resize(uri.size()-1);
308         }
309
310         ostringstream paramsList;
311         if(queryParams.size() > 0)
312         {
313             paramsList << '?';
314         }
315
316         for(auto& param : queryParams)
317         {
318             paramsList << param.first <<'='<<param.second<<'&';
319         }
320
321         std::string queryString = paramsList.str();
322         if(queryString.back() == '&')
323         {
324             queryString.resize(queryString.size() - 1);
325         }
326
327         std::string ret = uri + queryString;
328         return ret;
329     }
330
331     std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
332     {
333         MessageContainer ocInfo;
334         ocInfo.addRepresentation(rep);
335         return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
336     }
337
338     OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
339         const std::string& uri, const OCRepresentation& rep,
340         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
341         PostCallback& callback, QualityOfService QoS)
342     {
343         OCStackResult result;
344         OCCallbackData cbdata = {0};
345
346         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
347         ctx->callback = callback;
348         cbdata.cb = &setResourceCallback;
349         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
350         cbdata.context = static_cast<void*>(ctx);
351
352         // TODO: in the future the cstack should be combining these two strings!
353         ostringstream os;
354         os << host << assembleSetResourceUri(uri, queryParams).c_str();
355         // TODO: end of above
356
357         auto cLock = m_csdkLock.lock();
358
359         if(cLock)
360         {
361             std::lock_guard<std::recursive_mutex> lock(*cLock);
362             OCHeaderOption options[MAX_HEADER_OPTIONS];
363             OCDoHandle handle;
364
365             assembleHeaderOptions(options, headerOptions);
366             result = OCDoResource(&handle, OC_REST_POST,
367                                   os.str().c_str(), nullptr,
368                                   assembleSetResourcePayload(rep).c_str(),
369                                   static_cast<OCQualityOfService>(QoS),
370                                   &cbdata, options, headerOptions.size());
371         }
372         else
373         {
374             result = OC_STACK_ERROR;
375         }
376
377         return result;
378     }
379
380
381     OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
382         const std::string& uri, const OCRepresentation& rep,
383         const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
384         PutCallback& callback, QualityOfService QoS)
385     {
386         OCStackResult result;
387         OCCallbackData cbdata = {0};
388
389         ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
390         ctx->callback = callback;
391         cbdata.cb = &setResourceCallback;
392         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
393         cbdata.context = static_cast<void*>(ctx);
394
395         // TODO: in the future the cstack should be combining these two strings!
396         ostringstream os;
397         os << host << assembleSetResourceUri(uri, queryParams).c_str();
398         // TODO: end of above
399
400         auto cLock = m_csdkLock.lock();
401
402         if(cLock)
403         {
404             std::lock_guard<std::recursive_mutex> lock(*cLock);
405             OCDoHandle handle;
406             OCHeaderOption options[MAX_HEADER_OPTIONS];
407
408             assembleHeaderOptions(options, headerOptions);
409             result = OCDoResource(&handle, OC_REST_PUT,
410                                   os.str().c_str(), nullptr,
411                                   assembleSetResourcePayload(rep).c_str(),
412                                   static_cast<OCQualityOfService>(QoS),
413                                   &cbdata,
414                                   options, headerOptions.size());
415         }
416         else
417         {
418             result = OC_STACK_ERROR;
419         }
420
421         return result;
422     }
423
424     OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
425         OCClientResponse* clientResponse)
426     {
427         ClientCallbackContext::DeleteContext* context =
428             static_cast<ClientCallbackContext::DeleteContext*>(ctx);
429         HeaderOptions serverHeaderOptions;
430
431         if(clientResponse->result == OC_STACK_OK)
432         {
433             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
434         }
435         std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
436         exec.detach();
437         return OC_STACK_DELETE_TRANSACTION;
438     }
439
440     OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
441         const std::string& uri, const HeaderOptions& headerOptions,
442          DeleteCallback& callback, QualityOfService QoS)
443     {
444         OCStackResult result;
445         OCCallbackData cbdata = {0};
446
447         ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
448         ctx->callback = callback;
449         cbdata.cb = &deleteResourceCallback;
450         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
451         cbdata.context = static_cast<void*>(ctx);
452
453         ostringstream os;
454         os << host << uri;
455
456         auto cLock = m_csdkLock.lock();
457
458         if(cLock)
459         {
460             OCHeaderOption options[MAX_HEADER_OPTIONS];
461             OCDoHandle handle;
462
463             assembleHeaderOptions(options, headerOptions);
464
465             std::lock_guard<std::recursive_mutex> lock(*cLock);
466
467             result = OCDoResource(&handle, OC_REST_DELETE,
468                                   os.str().c_str(), nullptr,
469                                   nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
470                                   &cbdata, options, headerOptions.size());
471         }
472         else
473         {
474             result = OC_STACK_ERROR;
475         }
476
477         return result;
478     }
479
480     OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
481         OCClientResponse* clientResponse)
482     {
483         ClientCallbackContext::ObserveContext* context =
484             static_cast<ClientCallbackContext::ObserveContext*>(ctx);
485         OCRepresentation attrs;
486         HeaderOptions serverHeaderOptions;
487         uint32_t sequenceNumber = clientResponse->sequenceNumber;
488
489         if(clientResponse->result == OC_STACK_OK)
490         {
491             parseServerHeaderOptions(clientResponse, serverHeaderOptions);
492             attrs = parseGetSetCallback(clientResponse);
493         }
494         std::thread exec(context->callback, serverHeaderOptions, attrs,
495                     clientResponse->result, sequenceNumber);
496         exec.detach();
497         return OC_STACK_KEEP_TRANSACTION;
498     }
499
500     OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
501         const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
502         const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
503     {
504         OCStackResult result;
505         OCCallbackData cbdata = {0};
506
507         ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
508         ctx->callback = callback;
509         cbdata.context = static_cast<void*>(ctx);
510         cbdata.cb = &observeResourceCallback;
511         cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
512
513         OCMethod method;
514         if (observeType == ObserveType::Observe)
515         {
516             method = OC_REST_OBSERVE;
517         }
518         else if (observeType == ObserveType::ObserveAll)
519         {
520             method = OC_REST_OBSERVE_ALL;
521         }
522         else
523         {
524             method = OC_REST_OBSERVE_ALL;
525         }
526
527         auto cLock = m_csdkLock.lock();
528
529         if(cLock)
530         {
531             std::ostringstream os;
532             os << host << assembleSetResourceUri(uri, queryParams).c_str();
533
534             std::lock_guard<std::recursive_mutex> lock(*cLock);
535             OCHeaderOption options[MAX_HEADER_OPTIONS];
536
537             assembleHeaderOptions(options, headerOptions);
538             result = OCDoResource(handle, method,
539                                   os.str().c_str(), nullptr,
540                                   nullptr,
541                                   static_cast<OCQualityOfService>(QoS),
542                                   &cbdata,
543                                   options, headerOptions.size());
544         }
545         else
546         {
547             return OC_STACK_ERROR;
548         }
549
550         return result;
551     }
552
553     OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
554         const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
555         QualityOfService QoS)
556     {
557         OCStackResult result;
558         auto cLock = m_csdkLock.lock();
559
560         if(cLock)
561         {
562             std::lock_guard<std::recursive_mutex> lock(*cLock);
563             OCHeaderOption options[MAX_HEADER_OPTIONS];
564
565             assembleHeaderOptions(options, headerOptions);
566             result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
567                     headerOptions.size());
568         }
569         else
570         {
571             result = OC_STACK_ERROR;
572         }
573
574         return result;
575     }
576
577     OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
578         OCClientResponse* clientResponse)
579     {
580         ClientCallbackContext::SubscribePresenceContext* context =
581             static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
582         std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber);
583
584         exec.detach();
585         return OC_STACK_KEEP_TRANSACTION;
586     }
587
588     OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
589         const std::string& host, const std::string& resourceType,
590         SubscribeCallback& presenceHandler)
591     {
592         OCCallbackData cbdata = {0};
593
594         ClientCallbackContext::SubscribePresenceContext* ctx =
595             new ClientCallbackContext::SubscribePresenceContext();
596         ctx->callback = presenceHandler;
597         cbdata.cb = &subscribePresenceCallback;
598         cbdata.context = static_cast<void*>(ctx);
599         cbdata.cd = [](void* c)
600             {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
601         auto cLock = m_csdkLock.lock();
602
603         std::ostringstream os;
604         os << host << "/oc/presence";
605
606         if(!resourceType.empty())
607         {
608             os << "?rt=" << resourceType;
609         }
610
611         if(!cLock)
612             return OC_STACK_ERROR;
613
614         return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
615                             OC_LOW_QOS, &cbdata, NULL, 0);
616     }
617
618     OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
619     {
620         OCStackResult result;
621         auto cLock = m_csdkLock.lock();
622
623         if(cLock)
624         {
625             std::lock_guard<std::recursive_mutex> lock(*cLock);
626             result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
627         }
628         else
629         {
630             result = OC_STACK_ERROR;
631         }
632
633         return result;
634     }
635
636     OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
637     {
638         qos = m_cfg.QoS;
639         return OC_STACK_OK;
640     }
641
642     void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
643            const HeaderOptions& headerOptions)
644     {
645         int i = 0;
646
647         for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
648         {
649             options[i].protocolID = OC_COAP_ID;
650             options[i].optionID = static_cast<uint16_t>(it->getOptionID());
651             options[i].optionLength = (it->getOptionData()).length() + 1;
652             memcpy(options[i].optionData, (it->getOptionData()).c_str(),
653                     (it->getOptionData()).length() + 1);
654             i++;
655         }
656     }
657 }