1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
23 #include "InProcClientWrapper.h"
26 #include "OCPlatform.h"
27 #include "OCResource.h"
33 InProcClientWrapper::InProcClientWrapper(OC::OCPlatform& owner,
34 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
35 : IClientWrapper(owner),
36 m_threadRun(false), m_csdkLock(csdkLock),
40 // if the config type is server, we ought to never get called. If the config type
41 // is both, we count on the server to run the thread and do the initialize
42 if(m_cfg.mode == ModeType::Client)
44 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
46 if(OC_STACK_OK != result)
48 throw InitializeException("Error Initializing Stack", result);
52 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
56 InProcClientWrapper::~InProcClientWrapper()
58 if(m_threadRun && m_listeningThread.joinable())
61 m_listeningThread.join();
67 void InProcClientWrapper::listeningFunc()
72 auto cLock = m_csdkLock.lock();
75 std::lock_guard<std::recursive_mutex> lock(*cLock);
80 result = OC_STACK_ERROR;
83 if(result != OC_STACK_OK)
85 // TODO: do something with result if failed?
88 // To minimize CPU utilization we may wish to do this with sleep
89 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 std::string convertOCAddrToString(OCDevAddr& addr)
97 // TODO: we currently assume this is a IPV4 address, need to figure out the actual value
102 if(OCDevAddrToIPv4Addr(&addr, &a, &b, &c, &d) ==0 && OCDevAddrToPort(&addr, &port)==0)
105 os << "coap://"<<(int)a<<'.'<<(int)b<<'.'<<(int)c<<'.'<<(int)d<<':'<<(int)port;
116 FindCallback callback;
117 IClientWrapper::Ptr clientWrapper;
118 OC::OCPlatform const* owner; // observing ptr
122 const std::string URIKEY = "href";
123 const std::string OBSERVABLEKEY = "obs";
124 const std::string RESOURCETYPESKEY= "rt";
125 const std::string INTERFACESKEY = "if";
126 const std::string PROPERTYKEY = "prop";
127 const std::string REPKEY = "rep";
129 std::shared_ptr<OCResource> InProcClientWrapper::parseOCResource(
130 IClientWrapper::Ptr clientWrapper, const std::string& host,
131 const boost::property_tree::ptree resourceNode)
133 std::string uri = resourceNode.get<std::string>(URIKEY, "");
134 bool obs = resourceNode.get<int>(OBSERVABLEKEY,0) == 1;
135 std::vector<std::string> rTs;
136 std::vector<std::string> ifaces;
138 boost::property_tree::ptree properties =
139 resourceNode.get_child(PROPERTYKEY, boost::property_tree::ptree());
141 boost::property_tree::ptree rT =
142 properties.get_child(RESOURCETYPESKEY, boost::property_tree::ptree());
145 rTs.push_back(itr.second.data());
148 boost::property_tree::ptree iF =
149 properties.get_child(INTERFACESKEY, boost::property_tree::ptree());
152 ifaces.push_back(itr.second.data());
155 return std::shared_ptr<OCResource>(
156 new OCResource(clientWrapper, host, uri, obs, rTs, ifaces));
159 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
160 OCClientResponse* clientResponse)
162 ListenContext* context = static_cast<ListenContext*>(ctx);
164 if(clientResponse->result != OC_STACK_OK)
166 oclog() << "listenCallback(): failed to create resource. clientResponse: "
167 << clientResponse->result
170 return OC_STACK_KEEP_TRANSACTION;
173 std::stringstream requestStream;
174 requestStream << clientResponse->resJSONPayload;
176 boost::property_tree::ptree root;
180 boost::property_tree::read_json(requestStream, root);
182 catch(boost::property_tree::json_parser::json_parser_error &e)
184 oclog() << "listenCallback(): read_json() failed: " << e.what()
187 return OC_STACK_KEEP_TRANSACTION;
190 boost::property_tree::ptree payload =
191 root.get_child("oc", boost::property_tree::ptree());
193 for(auto payloadItr : payload)
197 std::string host = convertOCAddrToString(*clientResponse->addr);
198 std::shared_ptr<OCResource> resource =
199 context->clientWrapper->parseOCResource(context->clientWrapper, host,
202 // Note: the call to detach allows the underlying thread to continue until
203 // completion and allows us to destroy the exec object. This is apparently NOT
204 // a memory leak, as the thread will apparently take care of itself.
205 // Additionally, the only parameter here is a shared ptr, so OCResource will be
206 // disposed of properly upon completion of the callback handler.
207 std::thread exec(context->callback,resource);
210 catch(ResourceInitException& e)
212 oclog() << "listenCallback(): failed to create resource: " << e.what()
217 return OC_STACK_KEEP_TRANSACTION;
220 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
221 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
223 OCStackResult result;
225 OCCallbackData cbdata = {0};
227 ListenContext* context = new ListenContext();
228 context->callback = callback;
229 context->clientWrapper = shared_from_this();
230 context->owner = &m_owner;
232 cbdata.context = static_cast<void*>(context);
233 cbdata.cb = listenCallback;
234 cbdata.cd = [](void* c){delete static_cast<ListenContext*>(c);};
236 auto cLock = m_csdkLock.lock();
239 std::lock_guard<std::recursive_mutex> lock(*cLock);
241 result = OCDoResource(&handle, OC_REST_GET,
242 resourceType.c_str(),
244 static_cast<OCQualityOfService>(QoS),
250 result = OC_STACK_ERROR;
257 GetCallback callback;
262 PutCallback callback;
266 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
268 std::stringstream requestStream;
269 requestStream<<clientResponse->resJSONPayload;
270 if(strlen((char*)clientResponse->resJSONPayload) == 0)
272 return OCRepresentation();
275 boost::property_tree::ptree root;
278 boost::property_tree::read_json(requestStream, root);
280 catch(boost::property_tree::json_parser::json_parser_error &e)
282 return OCRepresentation();
284 boost::property_tree::ptree payload = root.get_child("oc", boost::property_tree::ptree());
285 OCRepresentation root_resource;
286 std::vector<OCRepresentation> children;
288 for ( auto payloadItr : payload)
290 OCRepresentation child;
293 auto resourceNode = payloadItr.second;
294 std::string uri = resourceNode.get<std::string>(URIKEY, "");
298 root_resource.setUri(uri);
305 if( resourceNode.count(PROPERTYKEY) != 0 )
307 std::vector<std::string> rTs;
308 std::vector<std::string> ifaces;
309 boost::property_tree::ptree properties =
310 resourceNode.get_child(PROPERTYKEY, boost::property_tree::ptree());
312 boost::property_tree::ptree rT =
313 properties.get_child(RESOURCETYPESKEY, boost::property_tree::ptree());
316 rTs.push_back(itr.second.data());
319 boost::property_tree::ptree iF =
320 properties.get_child(INTERFACESKEY, boost::property_tree::ptree());
323 ifaces.push_back(itr.second.data());
327 root_resource.setResourceInterfaces(ifaces);
328 root_resource.setResourceTypes(rTs);
332 child.setResourceInterfaces(ifaces);
333 child.setResourceTypes(rTs);
337 if( resourceNode.count(REPKEY) != 0 )
339 boost::property_tree::ptree rep =
340 resourceNode.get_child(REPKEY, boost::property_tree::ptree());
342 for( auto item : rep)
344 std::string name = item.first.data();
345 std::string value = item.second.data();
350 root_resource.setAttributeMap(attrs);
354 child.setAttributeMap(attrs);
359 children.push_back(child);
369 root_resource.setChildren(children);
371 return root_resource;
374 void parseServerHeaderOptions(OCClientResponse* clientResponse,
375 HeaderOptions& serverHeaderOptions)
379 // Parse header options from server
381 std::string optionData;
383 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
385 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
386 optionData = reinterpret_cast<const char*>
387 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
388 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
389 serverHeaderOptions.push_back(headerOption);
394 // clientResponse is invalid
395 // TODO check proper logging
396 std::cout << " Invalid response " << std::endl;
400 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
401 OCClientResponse* clientResponse)
403 GetContext* context = static_cast<GetContext*>(ctx);
405 OCRepresentation rep;
406 HeaderOptions serverHeaderOptions;
407 if(clientResponse->result == OC_STACK_OK)
409 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
410 rep = parseGetSetCallback(clientResponse);
413 std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
415 return OC_STACK_DELETE_TRANSACTION;
418 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
419 const std::string& uri, const QueryParamsMap& queryParams,
420 const HeaderOptions& headerOptions, GetCallback& callback,
421 QualityOfService QoS)
423 OCStackResult result;
424 OCCallbackData cbdata = {0};
426 GetContext* ctx = new GetContext();
427 ctx->callback = callback;
428 cbdata.context = static_cast<void*>(ctx);
429 cbdata.cb = &getResourceCallback;
430 cbdata.cd = [](void* c){delete static_cast<GetContext*>(c);};
432 auto cLock = m_csdkLock.lock();
436 std::ostringstream os;
437 os << host << assembleSetResourceUri(uri, queryParams).c_str();
439 std::lock_guard<std::recursive_mutex> lock(*cLock);
441 OCHeaderOption options[MAX_HEADER_OPTIONS];
443 assembleHeaderOptions(options, headerOptions);
444 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
446 static_cast<OCQualityOfService>(QoS),
448 options, headerOptions.size());
452 result = OC_STACK_ERROR;
458 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
459 OCClientResponse* clientResponse)
461 SetContext* context = static_cast<SetContext*>(ctx);
462 OCRepresentation attrs;
463 HeaderOptions serverHeaderOptions;
465 if (OC_STACK_OK == clientResponse->result ||
466 OC_STACK_RESOURCE_CREATED == clientResponse->result ||
467 OC_STACK_RESOURCE_DELETED == clientResponse->result)
469 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
470 attrs = parseGetSetCallback(clientResponse);
473 std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
475 return OC_STACK_DELETE_TRANSACTION;
478 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
479 const QueryParamsMap& queryParams)
481 if(uri.back() == '/')
483 uri.resize(uri.size()-1);
486 ostringstream paramsList;
487 if(queryParams.size() > 0)
492 for(auto& param : queryParams)
494 paramsList << param.first <<'='<<param.second<<'&';
497 std::string queryString = paramsList.str();
498 if(queryString.back() == '&')
500 queryString.resize(queryString.size() - 1);
503 std::string ret = uri + queryString;
507 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
509 ostringstream payload;
510 // TODO need to change the format to "{"oc":[]}"
511 payload << "{\"oc\":";
513 payload << rep.getJSONRepresentation();
516 return payload.str();
519 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
520 const std::string& uri, const OCRepresentation& rep,
521 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
522 PostCallback& callback, QualityOfService QoS)
524 OCStackResult result;
525 OCCallbackData cbdata = {0};
527 SetContext* ctx = new SetContext();
528 ctx->callback = callback;
529 cbdata.cb = &setResourceCallback;
530 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
531 cbdata.context = static_cast<void*>(ctx);
533 // TODO: in the future the cstack should be combining these two strings!
535 os << host << assembleSetResourceUri(uri, queryParams).c_str();
536 // TODO: end of above
538 auto cLock = m_csdkLock.lock();
542 std::lock_guard<std::recursive_mutex> lock(*cLock);
543 OCHeaderOption options[MAX_HEADER_OPTIONS];
546 assembleHeaderOptions(options, headerOptions);
547 result = OCDoResource(&handle, OC_REST_POST,
548 os.str().c_str(), nullptr,
549 assembleSetResourcePayload(rep).c_str(),
550 static_cast<OCQualityOfService>(QoS),
551 &cbdata, options, headerOptions.size());
555 result = OC_STACK_ERROR;
562 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
563 const std::string& uri, const OCRepresentation& rep,
564 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
565 PutCallback& callback, QualityOfService QoS)
567 OCStackResult result;
568 OCCallbackData cbdata = {0};
570 SetContext* ctx = new SetContext();
571 ctx->callback = callback;
572 cbdata.cb = &setResourceCallback;
573 cbdata.cd = [](void* c){delete static_cast<SetContext*>(c);};
574 cbdata.context = static_cast<void*>(ctx);
576 // TODO: in the future the cstack should be combining these two strings!
578 os << host << assembleSetResourceUri(uri, queryParams).c_str();
579 // TODO: end of above
581 auto cLock = m_csdkLock.lock();
585 std::lock_guard<std::recursive_mutex> lock(*cLock);
587 OCHeaderOption options[MAX_HEADER_OPTIONS];
589 assembleHeaderOptions(options, headerOptions);
590 result = OCDoResource(&handle, OC_REST_PUT,
591 os.str().c_str(), nullptr,
592 assembleSetResourcePayload(rep).c_str(),
593 static_cast<OCQualityOfService>(QoS),
595 options, headerOptions.size());
599 result = OC_STACK_ERROR;
607 DeleteCallback callback;
610 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
611 OCClientResponse* clientResponse)
613 DeleteContext* context = static_cast<DeleteContext*>(ctx);
614 OCRepresentation attrs;
615 HeaderOptions serverHeaderOptions;
617 if(clientResponse->result == OC_STACK_OK)
619 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
621 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
623 return OC_STACK_DELETE_TRANSACTION;
626 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
627 const std::string& uri, const HeaderOptions& headerOptions,
628 DeleteCallback& callback, QualityOfService QoS)
630 OCStackResult result;
631 OCCallbackData cbdata = {0};
633 DeleteContext* ctx = new DeleteContext();
634 ctx->callback = callback;
635 cbdata.cb = &deleteResourceCallback;
636 cbdata.cd = [](void* c){delete static_cast<DeleteContext*>(c);};
637 cbdata.context = static_cast<void*>(ctx);
642 auto cLock = m_csdkLock.lock();
646 OCHeaderOption options[MAX_HEADER_OPTIONS];
649 assembleHeaderOptions(options, headerOptions);
651 std::lock_guard<std::recursive_mutex> lock(*cLock);
653 result = OCDoResource(&handle, OC_REST_DELETE,
654 os.str().c_str(), nullptr,
655 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
656 &cbdata, options, headerOptions.size());
660 result = OC_STACK_ERROR;
666 struct ObserveContext
668 ObserveCallback callback;
671 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
672 OCClientResponse* clientResponse)
674 ObserveContext* context = static_cast<ObserveContext*>(ctx);
675 OCRepresentation attrs;
676 HeaderOptions serverHeaderOptions;
677 uint32_t sequenceNumber = clientResponse->sequenceNumber;
679 if(clientResponse->result == OC_STACK_OK)
681 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
682 attrs = parseGetSetCallback(clientResponse);
684 std::thread exec(context->callback, serverHeaderOptions, attrs,
685 clientResponse->result, sequenceNumber);
687 return OC_STACK_KEEP_TRANSACTION;
690 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
691 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
692 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
694 OCStackResult result;
695 OCCallbackData cbdata = {0};
697 ObserveContext* ctx = new ObserveContext();
698 ctx->callback = callback;
699 cbdata.context = static_cast<void*>(ctx);
700 cbdata.cb = &observeResourceCallback;
701 cbdata.cd = [](void* c){delete static_cast<ObserveContext*>(c);};
704 if (observeType == ObserveType::Observe)
706 method = OC_REST_OBSERVE;
708 else if (observeType == ObserveType::ObserveAll)
710 method = OC_REST_OBSERVE_ALL;
714 method = OC_REST_OBSERVE_ALL;
717 auto cLock = m_csdkLock.lock();
721 std::ostringstream os;
722 os << host << assembleSetResourceUri(uri, queryParams).c_str();
724 std::lock_guard<std::recursive_mutex> lock(*cLock);
725 OCHeaderOption options[MAX_HEADER_OPTIONS];
727 assembleHeaderOptions(options, headerOptions);
728 result = OCDoResource(handle, method,
729 os.str().c_str(), nullptr,
731 static_cast<OCQualityOfService>(QoS),
733 options, headerOptions.size());
737 return OC_STACK_ERROR;
743 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
744 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
745 QualityOfService QoS)
747 OCStackResult result;
748 auto cLock = m_csdkLock.lock();
752 std::lock_guard<std::recursive_mutex> lock(*cLock);
753 OCHeaderOption options[MAX_HEADER_OPTIONS];
755 assembleHeaderOptions(options, headerOptions);
756 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options, headerOptions.size());
760 result = OC_STACK_ERROR;
766 struct SubscribePresenceContext
768 SubscribeCallback callback;
771 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
772 OCClientResponse* clientResponse)
774 SubscribePresenceContext* context = static_cast<SubscribePresenceContext*>(ctx);
775 std::thread exec(context->callback, clientResponse->result, clientResponse->sequenceNumber);
778 return OC_STACK_KEEP_TRANSACTION;
781 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
782 const std::string& host, SubscribeCallback& presenceHandler)
784 OCCallbackData cbdata = {0};
786 SubscribePresenceContext* ctx = new SubscribePresenceContext();
787 ctx->callback = presenceHandler;
788 cbdata.cb = &subscribePresenceCallback;
789 cbdata.context = static_cast<void*>(ctx);
790 cbdata.cd = [](void* c){delete static_cast<SubscribePresenceContext*>(c);};
791 auto cLock = m_csdkLock.lock();
793 std::ostringstream os;
795 os << host << "/oc/presence";
798 return OC_STACK_ERROR;
800 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
801 OC_LOW_QOS, &cbdata, NULL, 0);
804 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
806 OCStackResult result;
807 auto cLock = m_csdkLock.lock();
811 std::lock_guard<std::recursive_mutex> lock(*cLock);
812 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
816 result = OC_STACK_ERROR;
822 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
828 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
829 const HeaderOptions& headerOptions)
833 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
835 options[i].protocolID = OC_COAP_ID;
836 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
837 options[i].optionLength = (it->getOptionData()).length() + 1;
838 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
839 (it->getOptionData()).length() + 1);