1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "InProcClientWrapper.h"
26 #include "OCPlatform.h"
27 #include "OCResource.h"
33 InProcClientWrapper::InProcClientWrapper(OC::OCPlatform_impl& owner,
34 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
35 : IClientWrapper(owner),
36 m_threadRun(false), m_csdkLock(csdkLock),
40 // if the config type is server, we ought to never get called. If the config type
41 // is both, we count on the server to run the thread and do the initialize
43 if(m_cfg.mode == ModeType::Client)
45 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
47 if(OC_STACK_OK != result)
49 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
53 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
57 InProcClientWrapper::~InProcClientWrapper()
59 if(m_threadRun && m_listeningThread.joinable())
62 m_listeningThread.join();
68 void InProcClientWrapper::listeningFunc()
73 auto cLock = m_csdkLock.lock();
76 std::lock_guard<std::recursive_mutex> lock(*cLock);
81 result = OC_STACK_ERROR;
84 if(result != OC_STACK_OK)
86 // TODO: do something with result if failed?
89 // To minimize CPU utilization we may wish to do this with sleep
90 std::this_thread::sleep_for(std::chrono::milliseconds(10));
94 std::string InProcClientWrapper::convertOCAddrToString(OCDevAddr& addr,
95 OCSecureType type, const std::string &portStr)
97 // TODO: we currently assume this is a IPV4 address, need to figure out the actual value
102 if(OCDevAddrToIPv4Addr(&addr, &a, &b, &c, &d) ==0 && OCDevAddrToPort(&addr, &port)==0)
105 if(type == OCSecureType::IPV4)
107 os << "coap://" << static_cast<int>(a) << '.' <<
108 static_cast<int>(b) << '.' << static_cast<int>(c) <<
109 '.' << static_cast<int>(d) << ':' <<static_cast<int>(port);
111 else if(type == OCSecureType::IPV4Secure)
113 os << "coaps://" << static_cast<int>(a) <<'.' <<
114 static_cast<int>(b) <<'.' << static_cast<int>(c) <<
115 '.' << static_cast<int>(d) << ':' << portStr;
121 return OC::Error::INVALID_IP;
127 FindCallback callback;
128 IClientWrapper::Ptr clientWrapper;
132 std::shared_ptr<OCResource> InProcClientWrapper::parseOCResource(
133 IClientWrapper::Ptr clientWrapper, OCDevAddr& addr,
134 const boost::property_tree::ptree resourceNode)
136 std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
137 bool obs = resourceNode.get<int>(OC::Key::OBSERVABLEKEY,0) == 1;
138 std::vector<std::string> rTs;
139 std::vector<std::string> ifaces;
141 boost::property_tree::ptree properties =
142 resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
144 boost::property_tree::ptree rT =
145 properties.get_child(OC::Key::RESOURCETYPESKEY, boost::property_tree::ptree());
148 rTs.push_back(itr.second.data());
150 bool secure = properties.get<int>(OC::Key::SECUREKEY,0) == 1;
152 boost::property_tree::ptree iF =
153 properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
156 ifaces.push_back(itr.second.data());
162 string port = properties.get<string>(OC::Key::PORTKEY,"");
163 host= convertOCAddrToString(addr, OCSecureType::IPV4Secure, port);
167 host= convertOCAddrToString(addr, OCSecureType::IPV4);
170 return std::shared_ptr<OCResource>(
171 new OCResource(clientWrapper, host, uri, obs, rTs, ifaces));
174 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
175 OCClientResponse* clientResponse)
177 ListenContext* context = static_cast<ListenContext*>(ctx);
179 if(clientResponse->result != OC_STACK_OK)
181 oclog() << "listenCallback(): failed to create resource. clientResponse: "
182 << clientResponse->result
185 return OC_STACK_KEEP_TRANSACTION;
188 std::stringstream requestStream;
189 requestStream << clientResponse->resJSONPayload;
191 boost::property_tree::ptree root;
195 boost::property_tree::read_json(requestStream, root);
197 catch(boost::property_tree::json_parser::json_parser_error &e)
199 oclog() << "listenCallback(): read_json() failed: " << e.what()
202 return OC_STACK_KEEP_TRANSACTION;
205 boost::property_tree::ptree payload =
206 root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
208 for(auto payloadItr : payload)
212 std::shared_ptr<OCResource> resource =
213 context->clientWrapper->parseOCResource(context->clientWrapper,
214 *clientResponse->addr,
217 // Note: the call to detach allows the underlying thread to continue until
218 // completion and allows us to destroy the exec object. This is apparently NOT
219 // a memory leak, as the thread will apparently take care of itself.
220 // Additionally, the only parameter here is a shared ptr, so OCResource will be
221 // disposed of properly upon completion of the callback handler.
222 std::thread exec(context->callback,resource);
225 catch(ResourceInitException& e)
227 oclog() << "listenCallback(): failed to create resource: " << e.what()
232 return OC_STACK_KEEP_TRANSACTION;
235 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
236 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
238 OCStackResult result;
240 OCCallbackData cbdata = {0};
242 ListenContext* context = new ListenContext();
243 context->callback = callback;
244 context->clientWrapper = shared_from_this();
246 cbdata.context = static_cast<void*>(context);
247 cbdata.cb = listenCallback;
248 cbdata.cd = [](void* c){delete static_cast<ListenContext*>(c);};
250 auto cLock = m_csdkLock.lock();
253 std::lock_guard<std::recursive_mutex> lock(*cLock);
255 result = OCDoResource(&handle, OC_REST_GET,
256 resourceType.c_str(),
258 static_cast<OCQualityOfService>(QoS),
264 result = OC_STACK_ERROR;
271 GetCallback callback;
276 PutCallback callback;
280 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
282 std::stringstream requestStream;
283 requestStream<<clientResponse->resJSONPayload;
284 if(strlen((char*)clientResponse->resJSONPayload) == 0)
286 return OCRepresentation();
289 boost::property_tree::ptree root;
292 boost::property_tree::read_json(requestStream, root);
294 catch(boost::property_tree::json_parser::json_parser_error &e)
296 return OCRepresentation();
298 boost::property_tree::ptree payload = root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
299 OCRepresentation root_resource;
300 std::vector<OCRepresentation> children;
302 for ( auto payloadItr : payload)
304 OCRepresentation child;
307 auto resourceNode = payloadItr.second;
308 std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
312 root_resource.setUri(uri);
319 if( resourceNode.count(OC::Key::PROPERTYKEY) != 0 )
321 std::vector<std::string> rTs;
322 std::vector<std::string> ifaces;
323 boost::property_tree::ptree properties =
324 resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
326 boost::property_tree::ptree rT =
327 properties.get_child(OC::Key::RESOURCETYPESKEY,
328 boost::property_tree::ptree());
331 rTs.push_back(itr.second.data());
334 boost::property_tree::ptree iF =
335 properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
338 ifaces.push_back(itr.second.data());
342 root_resource.setResourceInterfaces(ifaces);
343 root_resource.setResourceTypes(rTs);
347 child.setResourceInterfaces(ifaces);
348 child.setResourceTypes(rTs);
352 if( resourceNode.count(OC::Key::REPKEY) != 0 )
354 boost::property_tree::ptree rep =
355 resourceNode.get_child(OC::Key::REPKEY, boost::property_tree::ptree());
357 for( auto item : rep)
359 std::string name = item.first.data();
360 std::string value = item.second.data();
365 root_resource.setAttributeMap(attrs);
369 child.setAttributeMap(attrs);
374 children.push_back(child);
384 root_resource.setChildren(children);
386 return root_resource;
389 void parseServerHeaderOptions(OCClientResponse* clientResponse,
390 HeaderOptions& serverHeaderOptions)
394 // Parse header options from server
396 std::string optionData;
398 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
400 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
401 optionData = reinterpret_cast<const char*>
402 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
403 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
404 serverHeaderOptions.push_back(headerOption);
409 // clientResponse is invalid
410 // TODO check proper logging
411 std::cout << " Invalid response " << std::endl;
415 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
416 OCClientResponse* clientResponse)
418 GetContext* context = static_cast<GetContext*>(ctx);
420 OCRepresentation rep;
421 HeaderOptions serverHeaderOptions;
422 if(clientResponse->result == OC_STACK_OK)
424 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
425 rep = parseGetSetCallback(clientResponse);
428 std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
430 return OC_STACK_DELETE_TRANSACTION;
433 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
434 const std::string& uri, const QueryParamsMap& queryParams,
435 const HeaderOptions& headerOptions, GetCallback& callback,
436 QualityOfService QoS)
438 OCStackResult result;
439 OCCallbackData cbdata = {0};
441 GetContext* ctx = new GetContext();
442 ctx->callback = callback;
443 cbdata.context = static_cast<void*>(ctx);
444 cbdata.cb = &getResourceCallback;
445 cbdata.cd = [](void* c){delete static_cast<GetContext*>(c);};
447 auto cLock = m_csdkLock.lock();
451 std::ostringstream os;
452 os << host << assembleSetResourceUri(uri, queryParams).c_str();
454 std::lock_guard<std::recursive_mutex> lock(*cLock);
456 OCHeaderOption options[MAX_HEADER_OPTIONS];
458 assembleHeaderOptions(options, headerOptions);
459 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
461 static_cast<OCQualityOfService>(QoS),
463 options, headerOptions.size());
467 result = OC_STACK_ERROR;
473 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
474 OCClientResponse* clientResponse)
476 SetContext* context = static_cast<SetContext*>(ctx);
477 OCRepresentation attrs;
478 HeaderOptions serverHeaderOptions;
480 if (OC_STACK_OK == clientResponse->result ||
481 OC_STACK_RESOURCE_CREATED == clientResponse->result ||
482 OC_STACK_RESOURCE_DELETED == clientResponse->result)
484 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
485 attrs = parseGetSetCallback(clientResponse);
488 std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
490 return OC_STACK_DELETE_TRANSACTION;
493 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
494 const QueryParamsMap& queryParams)
496 if(uri.back() == '/')
498 uri.resize(uri.size()-1);
501 ostringstream paramsList;
502 if(queryParams.size() > 0)
507 for(auto& param : queryParams)
509 paramsList << param.first <<'='<<param.second<<'&';
512 std::string queryString = paramsList.str();
513 if(queryString.back() == '&')
515 queryString.resize(queryString.size() - 1);
518 std::string ret = uri + queryString;
522 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
524 ostringstream payload;
525 // TODO need to change the format to "{"oc":[]}"
526 payload << "{\"oc\":";
528 payload << rep.getJSONRepresentation();
531 return payload.str();
534 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
535 const std::string& uri, const OCRepresentation& rep,
536 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
537 PostCallback& callback, QualityOfService QoS)
539 OCStackResult result;
540 OCCallbackData cbdata = {0};
542 SetContext* ctx = new SetContext();
543 ctx->callback = callback;
544 cbdata.cb = &setResourceCallback;
545 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
546 cbdata.context = static_cast<void*>(ctx);
548 // TODO: in the future the cstack should be combining these two strings!
550 os << host << assembleSetResourceUri(uri, queryParams).c_str();
551 // TODO: end of above
553 auto cLock = m_csdkLock.lock();
557 std::lock_guard<std::recursive_mutex> lock(*cLock);
558 OCHeaderOption options[MAX_HEADER_OPTIONS];
561 assembleHeaderOptions(options, headerOptions);
562 result = OCDoResource(&handle, OC_REST_POST,
563 os.str().c_str(), nullptr,
564 assembleSetResourcePayload(rep).c_str(),
565 static_cast<OCQualityOfService>(QoS),
566 &cbdata, options, headerOptions.size());
570 result = OC_STACK_ERROR;
577 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
578 const std::string& uri, const OCRepresentation& rep,
579 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
580 PutCallback& callback, QualityOfService QoS)
582 OCStackResult result;
583 OCCallbackData cbdata = {0};
585 SetContext* ctx = new SetContext();
586 ctx->callback = callback;
587 cbdata.cb = &setResourceCallback;
588 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
589 cbdata.context = static_cast<void*>(ctx);
591 // TODO: in the future the cstack should be combining these two strings!
593 os << host << assembleSetResourceUri(uri, queryParams).c_str();
594 // TODO: end of above
596 auto cLock = m_csdkLock.lock();
600 std::lock_guard<std::recursive_mutex> lock(*cLock);
602 OCHeaderOption options[MAX_HEADER_OPTIONS];
604 assembleHeaderOptions(options, headerOptions);
605 result = OCDoResource(&handle, OC_REST_PUT,
606 os.str().c_str(), nullptr,
607 assembleSetResourcePayload(rep).c_str(),
608 static_cast<OCQualityOfService>(QoS),
610 options, headerOptions.size());
614 result = OC_STACK_ERROR;
622 DeleteCallback callback;
625 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
626 OCClientResponse* clientResponse)
628 DeleteContext* context = static_cast<DeleteContext*>(ctx);
629 OCRepresentation attrs;
630 HeaderOptions serverHeaderOptions;
632 if(clientResponse->result == OC_STACK_OK)
634 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
636 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
638 return OC_STACK_DELETE_TRANSACTION;
641 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
642 const std::string& uri, const HeaderOptions& headerOptions,
643 DeleteCallback& callback, QualityOfService QoS)
645 OCStackResult result;
646 OCCallbackData cbdata = {0};
648 DeleteContext* ctx = new DeleteContext();
649 ctx->callback = callback;
650 cbdata.cb = &deleteResourceCallback;
651 cbdata.cd = [](void* c){delete static_cast<DeleteContext*>(c);};
652 cbdata.context = static_cast<void*>(ctx);
657 auto cLock = m_csdkLock.lock();
661 OCHeaderOption options[MAX_HEADER_OPTIONS];
664 assembleHeaderOptions(options, headerOptions);
666 std::lock_guard<std::recursive_mutex> lock(*cLock);
668 result = OCDoResource(&handle, OC_REST_DELETE,
669 os.str().c_str(), nullptr,
670 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
671 &cbdata, options, headerOptions.size());
675 result = OC_STACK_ERROR;
681 struct ObserveContext
683 ObserveCallback callback;
686 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
687 OCClientResponse* clientResponse)
689 ObserveContext* context = static_cast<ObserveContext*>(ctx);
690 OCRepresentation attrs;
691 HeaderOptions serverHeaderOptions;
692 uint32_t sequenceNumber = clientResponse->sequenceNumber;
694 if(clientResponse->result == OC_STACK_OK)
696 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
697 attrs = parseGetSetCallback(clientResponse);
699 std::thread exec(context->callback, serverHeaderOptions, attrs,
700 clientResponse->result, sequenceNumber);
702 return OC_STACK_KEEP_TRANSACTION;
705 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
706 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
707 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
709 OCStackResult result;
710 OCCallbackData cbdata = {0};
712 ObserveContext* ctx = new ObserveContext();
713 ctx->callback = callback;
714 cbdata.context = static_cast<void*>(ctx);
715 cbdata.cb = &observeResourceCallback;
716 cbdata.cd = [](void* c){delete static_cast<ObserveContext*>(c);};
719 if (observeType == ObserveType::Observe)
721 method = OC_REST_OBSERVE;
723 else if (observeType == ObserveType::ObserveAll)
725 method = OC_REST_OBSERVE_ALL;
729 method = OC_REST_OBSERVE_ALL;
732 auto cLock = m_csdkLock.lock();
736 std::ostringstream os;
737 os << host << assembleSetResourceUri(uri, queryParams).c_str();
739 std::lock_guard<std::recursive_mutex> lock(*cLock);
740 OCHeaderOption options[MAX_HEADER_OPTIONS];
742 assembleHeaderOptions(options, headerOptions);
743 result = OCDoResource(handle, method,
744 os.str().c_str(), nullptr,
746 static_cast<OCQualityOfService>(QoS),
748 options, headerOptions.size());
752 return OC_STACK_ERROR;
758 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
759 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
760 QualityOfService QoS)
762 OCStackResult result;
763 auto cLock = m_csdkLock.lock();
767 std::lock_guard<std::recursive_mutex> lock(*cLock);
768 OCHeaderOption options[MAX_HEADER_OPTIONS];
770 assembleHeaderOptions(options, headerOptions);
771 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options, headerOptions.size());
775 result = OC_STACK_ERROR;
781 struct SubscribePresenceContext
783 SubscribeCallback callback;
786 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
787 OCClientResponse* clientResponse)
789 SubscribePresenceContext* context = static_cast<SubscribePresenceContext*>(ctx);
790 std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber);
793 return OC_STACK_KEEP_TRANSACTION;
796 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
797 const std::string& host, SubscribeCallback& presenceHandler)
799 OCCallbackData cbdata = {0};
801 SubscribePresenceContext* ctx = new SubscribePresenceContext();
802 ctx->callback = presenceHandler;
803 cbdata.cb = &subscribePresenceCallback;
804 cbdata.context = static_cast<void*>(ctx);
805 cbdata.cd = [](void* c){delete static_cast<SubscribePresenceContext*>(c);};
806 auto cLock = m_csdkLock.lock();
808 std::ostringstream os;
810 os << host << "/oc/presence";
813 return OC_STACK_ERROR;
815 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
816 OC_LOW_QOS, &cbdata, NULL, 0);
819 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
821 OCStackResult result;
822 auto cLock = m_csdkLock.lock();
826 std::lock_guard<std::recursive_mutex> lock(*cLock);
827 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
831 result = OC_STACK_ERROR;
837 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
843 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
844 const HeaderOptions& headerOptions)
848 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
850 options[i].protocolID = OC_COAP_ID;
851 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
852 options[i].optionLength = (it->getOptionData()).length() + 1;
853 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
854 (it->getOptionData()).length() + 1);