1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "InProcClientWrapper.h"
26 #include "OCPlatform.h"
27 #include "OCResource.h"
33 InProcClientWrapper::InProcClientWrapper(OC::OCPlatform_impl& owner,
34 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
35 : IClientWrapper(owner),
36 m_threadRun(false), m_csdkLock(csdkLock),
40 // if the config type is server, we ought to never get called. If the config type
41 // is both, we count on the server to run the thread and do the initialize
42 if(m_cfg.mode == ModeType::Client)
44 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
46 if(OC_STACK_OK != result)
48 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
52 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
56 InProcClientWrapper::~InProcClientWrapper()
58 if(m_threadRun && m_listeningThread.joinable())
61 m_listeningThread.join();
67 void InProcClientWrapper::listeningFunc()
72 auto cLock = m_csdkLock.lock();
75 std::lock_guard<std::recursive_mutex> lock(*cLock);
80 result = OC_STACK_ERROR;
83 if(result != OC_STACK_OK)
85 // TODO: do something with result if failed?
88 // To minimize CPU utilization we may wish to do this with sleep
89 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 std::string convertOCAddrToString(OCDevAddr& addr)
97 // TODO: we currently assume this is a IPV4 address, need to figure out the actual value
102 if(OCDevAddrToIPv4Addr(&addr, &a, &b, &c, &d) ==0 && OCDevAddrToPort(&addr, &port)==0)
105 os << "coap://"<<(int)a<<'.'<<(int)b<<'.'<<(int)c<<'.'<<(int)d<<':'<<(int)port;
110 return OC::Error::INVALID_IP;
116 FindCallback callback;
117 IClientWrapper::Ptr clientWrapper;
121 std::shared_ptr<OCResource> InProcClientWrapper::parseOCResource(
122 IClientWrapper::Ptr clientWrapper, const std::string& host,
123 const boost::property_tree::ptree resourceNode)
125 std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
126 bool obs = resourceNode.get<int>(OC::Key::OBSERVABLEKEY,0) == 1;
127 std::vector<std::string> rTs;
128 std::vector<std::string> ifaces;
130 boost::property_tree::ptree properties =
131 resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
133 boost::property_tree::ptree rT =
134 properties.get_child(OC::Key::RESOURCETYPESKEY, boost::property_tree::ptree());
137 rTs.push_back(itr.second.data());
140 boost::property_tree::ptree iF =
141 properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
144 ifaces.push_back(itr.second.data());
147 return std::shared_ptr<OCResource>(
148 new OCResource(clientWrapper, host, uri, obs, rTs, ifaces));
151 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
152 OCClientResponse* clientResponse)
154 ListenContext* context = static_cast<ListenContext*>(ctx);
156 if(clientResponse->result != OC_STACK_OK)
158 oclog() << "listenCallback(): failed to create resource. clientResponse: "
159 << clientResponse->result
162 return OC_STACK_KEEP_TRANSACTION;
165 std::stringstream requestStream;
166 requestStream << clientResponse->resJSONPayload;
168 boost::property_tree::ptree root;
172 boost::property_tree::read_json(requestStream, root);
174 catch(boost::property_tree::json_parser::json_parser_error &e)
176 oclog() << "listenCallback(): read_json() failed: " << e.what()
179 return OC_STACK_KEEP_TRANSACTION;
182 boost::property_tree::ptree payload =
183 root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
185 for(auto payloadItr : payload)
189 std::string host = convertOCAddrToString(*clientResponse->addr);
190 std::shared_ptr<OCResource> resource =
191 context->clientWrapper->parseOCResource(context->clientWrapper, host,
194 // Note: the call to detach allows the underlying thread to continue until
195 // completion and allows us to destroy the exec object. This is apparently NOT
196 // a memory leak, as the thread will apparently take care of itself.
197 // Additionally, the only parameter here is a shared ptr, so OCResource will be
198 // disposed of properly upon completion of the callback handler.
199 std::thread exec(context->callback,resource);
202 catch(ResourceInitException& e)
204 oclog() << "listenCallback(): failed to create resource: " << e.what()
209 return OC_STACK_KEEP_TRANSACTION;
212 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
213 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
215 OCStackResult result;
217 OCCallbackData cbdata = {0};
219 ListenContext* context = new ListenContext();
220 context->callback = callback;
221 context->clientWrapper = shared_from_this();
223 cbdata.context = static_cast<void*>(context);
224 cbdata.cb = listenCallback;
225 cbdata.cd = [](void* c){delete static_cast<ListenContext*>(c);};
227 auto cLock = m_csdkLock.lock();
230 std::lock_guard<std::recursive_mutex> lock(*cLock);
232 result = OCDoResource(&handle, OC_REST_GET,
233 resourceType.c_str(),
235 static_cast<OCQualityOfService>(QoS),
241 result = OC_STACK_ERROR;
248 GetCallback callback;
253 PutCallback callback;
257 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
259 std::stringstream requestStream;
260 requestStream<<clientResponse->resJSONPayload;
261 if(strlen((char*)clientResponse->resJSONPayload) == 0)
263 return OCRepresentation();
266 boost::property_tree::ptree root;
269 boost::property_tree::read_json(requestStream, root);
271 catch(boost::property_tree::json_parser::json_parser_error &e)
273 return OCRepresentation();
275 boost::property_tree::ptree payload = root.get_child(OC::Key::OCKEY, boost::property_tree::ptree());
276 OCRepresentation root_resource;
277 std::vector<OCRepresentation> children;
279 for ( auto payloadItr : payload)
281 OCRepresentation child;
284 auto resourceNode = payloadItr.second;
285 std::string uri = resourceNode.get<std::string>(OC::Key::URIKEY, "");
289 root_resource.setUri(uri);
296 if( resourceNode.count(OC::Key::PROPERTYKEY) != 0 )
298 std::vector<std::string> rTs;
299 std::vector<std::string> ifaces;
300 boost::property_tree::ptree properties =
301 resourceNode.get_child(OC::Key::PROPERTYKEY, boost::property_tree::ptree());
303 boost::property_tree::ptree rT =
304 properties.get_child(OC::Key::RESOURCETYPESKEY,
305 boost::property_tree::ptree());
308 rTs.push_back(itr.second.data());
311 boost::property_tree::ptree iF =
312 properties.get_child(OC::Key::INTERFACESKEY, boost::property_tree::ptree());
315 ifaces.push_back(itr.second.data());
319 root_resource.setResourceInterfaces(ifaces);
320 root_resource.setResourceTypes(rTs);
324 child.setResourceInterfaces(ifaces);
325 child.setResourceTypes(rTs);
329 if( resourceNode.count(OC::Key::REPKEY) != 0 )
331 boost::property_tree::ptree rep =
332 resourceNode.get_child(OC::Key::REPKEY, boost::property_tree::ptree());
334 for( auto item : rep)
336 std::string name = item.first.data();
337 std::string value = item.second.data();
342 root_resource.setAttributeMap(attrs);
346 child.setAttributeMap(attrs);
351 children.push_back(child);
361 root_resource.setChildren(children);
363 return root_resource;
366 void parseServerHeaderOptions(OCClientResponse* clientResponse,
367 HeaderOptions& serverHeaderOptions)
371 // Parse header options from server
373 std::string optionData;
375 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
377 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
378 optionData = reinterpret_cast<const char*>
379 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
380 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
381 serverHeaderOptions.push_back(headerOption);
386 // clientResponse is invalid
387 // TODO check proper logging
388 std::cout << " Invalid response " << std::endl;
392 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
393 OCClientResponse* clientResponse)
395 GetContext* context = static_cast<GetContext*>(ctx);
397 OCRepresentation rep;
398 HeaderOptions serverHeaderOptions;
399 if(clientResponse->result == OC_STACK_OK)
401 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
402 rep = parseGetSetCallback(clientResponse);
405 std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
407 return OC_STACK_DELETE_TRANSACTION;
410 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
411 const std::string& uri, const QueryParamsMap& queryParams,
412 const HeaderOptions& headerOptions, GetCallback& callback,
413 QualityOfService QoS)
415 OCStackResult result;
416 OCCallbackData cbdata = {0};
418 GetContext* ctx = new GetContext();
419 ctx->callback = callback;
420 cbdata.context = static_cast<void*>(ctx);
421 cbdata.cb = &getResourceCallback;
422 cbdata.cd = [](void* c){delete static_cast<GetContext*>(c);};
424 auto cLock = m_csdkLock.lock();
428 std::ostringstream os;
429 os << host << assembleSetResourceUri(uri, queryParams).c_str();
431 std::lock_guard<std::recursive_mutex> lock(*cLock);
433 OCHeaderOption options[MAX_HEADER_OPTIONS];
435 assembleHeaderOptions(options, headerOptions);
436 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
438 static_cast<OCQualityOfService>(QoS),
440 options, headerOptions.size());
444 result = OC_STACK_ERROR;
450 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
451 OCClientResponse* clientResponse)
453 SetContext* context = static_cast<SetContext*>(ctx);
454 OCRepresentation attrs;
455 HeaderOptions serverHeaderOptions;
457 if (OC_STACK_OK == clientResponse->result ||
458 OC_STACK_RESOURCE_CREATED == clientResponse->result ||
459 OC_STACK_RESOURCE_DELETED == clientResponse->result)
461 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
462 attrs = parseGetSetCallback(clientResponse);
465 std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
467 return OC_STACK_DELETE_TRANSACTION;
470 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
471 const QueryParamsMap& queryParams)
473 if(uri.back() == '/')
475 uri.resize(uri.size()-1);
478 ostringstream paramsList;
479 if(queryParams.size() > 0)
484 for(auto& param : queryParams)
486 paramsList << param.first <<'='<<param.second<<'&';
489 std::string queryString = paramsList.str();
490 if(queryString.back() == '&')
492 queryString.resize(queryString.size() - 1);
495 std::string ret = uri + queryString;
499 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
501 ostringstream payload;
502 // TODO need to change the format to "{"oc":[]}"
503 payload << "{\"oc\":";
505 payload << rep.getJSONRepresentation();
508 return payload.str();
511 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
512 const std::string& uri, const OCRepresentation& rep,
513 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
514 PostCallback& callback, QualityOfService QoS)
516 OCStackResult result;
517 OCCallbackData cbdata = {0};
519 SetContext* ctx = new SetContext();
520 ctx->callback = callback;
521 cbdata.cb = &setResourceCallback;
522 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
523 cbdata.context = static_cast<void*>(ctx);
525 // TODO: in the future the cstack should be combining these two strings!
527 os << host << assembleSetResourceUri(uri, queryParams).c_str();
528 // TODO: end of above
530 auto cLock = m_csdkLock.lock();
534 std::lock_guard<std::recursive_mutex> lock(*cLock);
535 OCHeaderOption options[MAX_HEADER_OPTIONS];
538 assembleHeaderOptions(options, headerOptions);
539 result = OCDoResource(&handle, OC_REST_POST,
540 os.str().c_str(), nullptr,
541 assembleSetResourcePayload(rep).c_str(),
542 static_cast<OCQualityOfService>(QoS),
543 &cbdata, options, headerOptions.size());
547 result = OC_STACK_ERROR;
554 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
555 const std::string& uri, const OCRepresentation& rep,
556 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
557 PutCallback& callback, QualityOfService QoS)
559 OCStackResult result;
560 OCCallbackData cbdata = {0};
562 SetContext* ctx = new SetContext();
563 ctx->callback = callback;
564 cbdata.cb = &setResourceCallback;
565 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
566 cbdata.context = static_cast<void*>(ctx);
568 // TODO: in the future the cstack should be combining these two strings!
570 os << host << assembleSetResourceUri(uri, queryParams).c_str();
571 // TODO: end of above
573 auto cLock = m_csdkLock.lock();
577 std::lock_guard<std::recursive_mutex> lock(*cLock);
579 OCHeaderOption options[MAX_HEADER_OPTIONS];
581 assembleHeaderOptions(options, headerOptions);
582 result = OCDoResource(&handle, OC_REST_PUT,
583 os.str().c_str(), nullptr,
584 assembleSetResourcePayload(rep).c_str(),
585 static_cast<OCQualityOfService>(QoS),
587 options, headerOptions.size());
591 result = OC_STACK_ERROR;
599 DeleteCallback callback;
602 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
603 OCClientResponse* clientResponse)
605 DeleteContext* context = static_cast<DeleteContext*>(ctx);
606 OCRepresentation attrs;
607 HeaderOptions serverHeaderOptions;
609 if(clientResponse->result == OC_STACK_OK)
611 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
613 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
615 return OC_STACK_DELETE_TRANSACTION;
618 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
619 const std::string& uri, const HeaderOptions& headerOptions,
620 DeleteCallback& callback, QualityOfService QoS)
622 OCStackResult result;
623 OCCallbackData cbdata = {0};
625 DeleteContext* ctx = new DeleteContext();
626 ctx->callback = callback;
627 cbdata.cb = &deleteResourceCallback;
628 cbdata.cd = [](void* c){delete static_cast<DeleteContext*>(c);};
629 cbdata.context = static_cast<void*>(ctx);
634 auto cLock = m_csdkLock.lock();
638 OCHeaderOption options[MAX_HEADER_OPTIONS];
641 assembleHeaderOptions(options, headerOptions);
643 std::lock_guard<std::recursive_mutex> lock(*cLock);
645 result = OCDoResource(&handle, OC_REST_DELETE,
646 os.str().c_str(), nullptr,
647 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
648 &cbdata, options, headerOptions.size());
652 result = OC_STACK_ERROR;
658 struct ObserveContext
660 ObserveCallback callback;
663 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
664 OCClientResponse* clientResponse)
666 ObserveContext* context = static_cast<ObserveContext*>(ctx);
667 OCRepresentation attrs;
668 HeaderOptions serverHeaderOptions;
669 uint32_t sequenceNumber = clientResponse->sequenceNumber;
671 if(clientResponse->result == OC_STACK_OK)
673 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
674 attrs = parseGetSetCallback(clientResponse);
676 std::thread exec(context->callback, serverHeaderOptions, attrs,
677 clientResponse->result, sequenceNumber);
679 return OC_STACK_KEEP_TRANSACTION;
682 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
683 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
684 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
686 OCStackResult result;
687 OCCallbackData cbdata = {0};
689 ObserveContext* ctx = new ObserveContext();
690 ctx->callback = callback;
691 cbdata.context = static_cast<void*>(ctx);
692 cbdata.cb = &observeResourceCallback;
693 cbdata.cd = [](void* c){delete static_cast<ObserveContext*>(c);};
696 if (observeType == ObserveType::Observe)
698 method = OC_REST_OBSERVE;
700 else if (observeType == ObserveType::ObserveAll)
702 method = OC_REST_OBSERVE_ALL;
706 method = OC_REST_OBSERVE_ALL;
709 auto cLock = m_csdkLock.lock();
713 std::ostringstream os;
714 os << host << assembleSetResourceUri(uri, queryParams).c_str();
716 std::lock_guard<std::recursive_mutex> lock(*cLock);
717 OCHeaderOption options[MAX_HEADER_OPTIONS];
719 assembleHeaderOptions(options, headerOptions);
720 result = OCDoResource(handle, method,
721 os.str().c_str(), nullptr,
723 static_cast<OCQualityOfService>(QoS),
725 options, headerOptions.size());
729 return OC_STACK_ERROR;
735 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
736 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
737 QualityOfService QoS)
739 OCStackResult result;
740 auto cLock = m_csdkLock.lock();
744 std::lock_guard<std::recursive_mutex> lock(*cLock);
745 OCHeaderOption options[MAX_HEADER_OPTIONS];
747 assembleHeaderOptions(options, headerOptions);
748 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options, headerOptions.size());
752 result = OC_STACK_ERROR;
758 struct SubscribePresenceContext
760 SubscribeCallback callback;
763 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
764 OCClientResponse* clientResponse)
766 SubscribePresenceContext* context = static_cast<SubscribePresenceContext*>(ctx);
767 std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber);
770 return OC_STACK_KEEP_TRANSACTION;
773 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
774 const std::string& host, SubscribeCallback& presenceHandler)
776 OCCallbackData cbdata = {0};
778 SubscribePresenceContext* ctx = new SubscribePresenceContext();
779 ctx->callback = presenceHandler;
780 cbdata.cb = &subscribePresenceCallback;
781 cbdata.context = static_cast<void*>(ctx);
782 cbdata.cd = [](void* c){delete static_cast<SubscribePresenceContext*>(c);};
783 auto cLock = m_csdkLock.lock();
785 std::ostringstream os;
787 os << host << "/oc/presence";
790 return OC_STACK_ERROR;
792 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
793 OC_LOW_QOS, &cbdata, NULL, 0);
796 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
798 OCStackResult result;
799 auto cLock = m_csdkLock.lock();
803 std::lock_guard<std::recursive_mutex> lock(*cLock);
804 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
808 result = OC_STACK_ERROR;
814 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
820 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
821 const HeaderOptions& headerOptions)
825 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
827 options[i].protocolID = OC_COAP_ID;
828 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
829 options[i].optionLength = (it->getOptionData()).length() + 1;
830 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
831 (it->getOptionData()).length() + 1);