1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
105 oc.setJSONRepresentation(clientResponse->resJSONPayload);
107 catch (cereal::RapidJSONException& ex)
109 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
113 catch (cereal::Exception& ex)
115 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
120 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121 if(it == oc.representations().end())
123 return OCRepresentation();
126 // first one is considered the root, everything else is considered a child of this one.
127 OCRepresentation root = *it;
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 auto clientWrapper = context->clientWrapper.lock();
156 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
158 return OC_STACK_KEEP_TRANSACTION;
161 std::stringstream requestStream;
162 requestStream << clientResponse->resJSONPayload;
167 ListenOCContainer container(clientWrapper, *clientResponse->addr,
168 clientResponse->connType, requestStream);
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 catch(const std::exception& e)
179 oclog() << "listenCallback failed to parse a malformed message: "
182 << clientResponse->resJSONPayload
184 << clientResponse->result
186 return OC_STACK_KEEP_TRANSACTION;
189 return OC_STACK_KEEP_TRANSACTION;
192 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193 const std::string& resourceType, OCConnectivityType connectivityType,
194 FindCallback& callback, QualityOfService QoS)
198 return OC_STACK_INVALID_PARAM;
201 OCStackResult result;
203 ClientCallbackContext::ListenContext* context =
204 new ClientCallbackContext::ListenContext(callback, shared_from_this());
205 OCCallbackData cbdata(
206 static_cast<void*>(context),
208 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
211 auto cLock = m_csdkLock.lock();
214 std::lock_guard<std::recursive_mutex> lock(*cLock);
215 result = OCDoResource(nullptr, OC_REST_GET,
216 resourceType.c_str(),
217 nullptr, nullptr, connectivityType,
218 static_cast<OCQualityOfService>(QoS),
225 result = OC_STACK_ERROR;
230 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
231 OCClientResponse* clientResponse)
233 ClientCallbackContext::DeviceListenContext* context =
234 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
238 OCRepresentation rep = parseGetSetCallback(clientResponse);
239 std::thread exec(context->callback, rep);
242 catch(OC::OCException& e)
244 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
245 <<e.what() <<std::flush;
248 return OC_STACK_KEEP_TRANSACTION;
251 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
252 const std::string& deviceURI, OCConnectivityType connectivityType,
253 FindDeviceCallback& callback, QualityOfService QoS)
257 return OC_STACK_INVALID_PARAM;
259 OCStackResult result;
261 ClientCallbackContext::DeviceListenContext* context =
262 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
263 OCCallbackData cbdata(
264 static_cast<void*>(context),
265 listenDeviceCallback,
266 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
269 auto cLock = m_csdkLock.lock();
272 std::lock_guard<std::recursive_mutex> lock(*cLock);
273 result = OCDoResource(nullptr, OC_REST_GET,
275 nullptr, nullptr, connectivityType,
276 static_cast<OCQualityOfService>(QoS),
283 result = OC_STACK_ERROR;
288 void parseServerHeaderOptions(OCClientResponse* clientResponse,
289 HeaderOptions& serverHeaderOptions)
293 // Parse header options from server
295 std::string optionData;
297 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
299 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
300 optionData = reinterpret_cast<const char*>
301 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
302 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
303 serverHeaderOptions.push_back(headerOption);
308 // clientResponse is invalid
309 // TODO check proper logging
310 std::cout << " Invalid response " << std::endl;
314 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
315 OCClientResponse* clientResponse)
317 ClientCallbackContext::GetContext* context =
318 static_cast<ClientCallbackContext::GetContext*>(ctx);
320 OCRepresentation rep;
321 HeaderOptions serverHeaderOptions;
322 OCStackResult result = clientResponse->result;
323 if(result == OC_STACK_OK)
325 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
328 rep = parseGetSetCallback(clientResponse);
330 catch(OC::OCException& e)
336 std::thread exec(context->callback, serverHeaderOptions, rep, result);
338 return OC_STACK_DELETE_TRANSACTION;
341 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
342 const std::string& uri, OCConnectivityType connectivityType,
343 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
344 GetCallback& callback, QualityOfService QoS)
348 return OC_STACK_INVALID_PARAM;
350 OCStackResult result;
351 ClientCallbackContext::GetContext* ctx =
352 new ClientCallbackContext::GetContext(callback);
353 OCCallbackData cbdata(
354 static_cast<void*>(ctx),
356 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
359 auto cLock = m_csdkLock.lock();
363 std::ostringstream os;
364 os << host << assembleSetResourceUri(uri, queryParams).c_str();
366 std::lock_guard<std::recursive_mutex> lock(*cLock);
367 OCHeaderOption options[MAX_HEADER_OPTIONS];
369 result = OCDoResource(nullptr, OC_REST_GET, os.str().c_str(),
370 nullptr, nullptr, connectivityType,
371 static_cast<OCQualityOfService>(QoS),
373 assembleHeaderOptions(options, headerOptions),
374 headerOptions.size());
379 result = OC_STACK_ERROR;
385 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
386 OCClientResponse* clientResponse)
388 ClientCallbackContext::SetContext* context =
389 static_cast<ClientCallbackContext::SetContext*>(ctx);
390 OCRepresentation attrs;
391 HeaderOptions serverHeaderOptions;
393 OCStackResult result = clientResponse->result;
394 if (OC_STACK_OK == result ||
395 OC_STACK_RESOURCE_CREATED == result ||
396 OC_STACK_RESOURCE_DELETED == result)
398 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
401 attrs = parseGetSetCallback(clientResponse);
403 catch(OC::OCException& e)
409 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
411 return OC_STACK_DELETE_TRANSACTION;
414 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
415 const QueryParamsMap& queryParams)
417 if(uri.back() == '/')
419 uri.resize(uri.size()-1);
422 ostringstream paramsList;
423 if(queryParams.size() > 0)
428 for(auto& param : queryParams)
430 paramsList << param.first <<'='<<param.second<<';';
433 std::string queryString = paramsList.str();
434 if(queryString.back() == ';')
436 queryString.resize(queryString.size() - 1);
439 std::string ret = uri + queryString;
443 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
445 MessageContainer ocInfo;
446 ocInfo.addRepresentation(rep);
447 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
450 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
451 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
452 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
453 PostCallback& callback, QualityOfService QoS)
457 return OC_STACK_INVALID_PARAM;
459 OCStackResult result;
460 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
461 OCCallbackData cbdata(
462 static_cast<void*>(ctx),
464 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
467 // TODO: in the future the cstack should be combining these two strings!
469 os << host << assembleSetResourceUri(uri, queryParams).c_str();
470 // TODO: end of above
472 auto cLock = m_csdkLock.lock();
476 std::lock_guard<std::recursive_mutex> lock(*cLock);
477 OCHeaderOption options[MAX_HEADER_OPTIONS];
479 result = OCDoResource(nullptr, OC_REST_POST,
480 os.str().c_str(), nullptr,
481 assembleSetResourcePayload(rep).c_str(), connectivityType,
482 static_cast<OCQualityOfService>(QoS),
484 assembleHeaderOptions(options, headerOptions),
485 headerOptions.size());
490 result = OC_STACK_ERROR;
496 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
497 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
498 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
499 PutCallback& callback, QualityOfService QoS)
503 return OC_STACK_INVALID_PARAM;
505 OCStackResult result;
506 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
507 OCCallbackData cbdata(
508 static_cast<void*>(ctx),
510 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
513 // TODO: in the future the cstack should be combining these two strings!
515 os << host << assembleSetResourceUri(uri, queryParams).c_str();
516 // TODO: end of above
518 auto cLock = m_csdkLock.lock();
522 std::lock_guard<std::recursive_mutex> lock(*cLock);
524 OCHeaderOption options[MAX_HEADER_OPTIONS];
526 result = OCDoResource(&handle, OC_REST_PUT,
527 os.str().c_str(), nullptr,
528 assembleSetResourcePayload(rep).c_str(), connectivityType,
529 static_cast<OCQualityOfService>(QoS),
531 assembleHeaderOptions(options, headerOptions),
532 headerOptions.size());
537 result = OC_STACK_ERROR;
543 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
544 OCClientResponse* clientResponse)
546 ClientCallbackContext::DeleteContext* context =
547 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
548 HeaderOptions serverHeaderOptions;
550 if(clientResponse->result == OC_STACK_OK)
552 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
554 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
556 return OC_STACK_DELETE_TRANSACTION;
559 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
560 const std::string& uri, OCConnectivityType connectivityType,
561 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
565 return OC_STACK_INVALID_PARAM;
567 OCStackResult result;
568 ClientCallbackContext::DeleteContext* ctx =
569 new ClientCallbackContext::DeleteContext(callback);
570 OCCallbackData cbdata(
571 static_cast<void*>(ctx),
572 deleteResourceCallback,
573 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
579 auto cLock = m_csdkLock.lock();
583 OCHeaderOption options[MAX_HEADER_OPTIONS];
585 std::lock_guard<std::recursive_mutex> lock(*cLock);
587 result = OCDoResource(nullptr, OC_REST_DELETE,
588 os.str().c_str(), nullptr,
589 nullptr, connectivityType,
590 static_cast<OCQualityOfService>(m_cfg.QoS),
592 assembleHeaderOptions(options, headerOptions),
593 headerOptions.size());
598 result = OC_STACK_ERROR;
604 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
605 OCClientResponse* clientResponse)
607 ClientCallbackContext::ObserveContext* context =
608 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
609 OCRepresentation attrs;
610 HeaderOptions serverHeaderOptions;
611 uint32_t sequenceNumber = clientResponse->sequenceNumber;
612 OCStackResult result = clientResponse->result;
613 if(clientResponse->result == OC_STACK_OK)
615 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
618 attrs = parseGetSetCallback(clientResponse);
620 catch(OC::OCException& e)
625 std::thread exec(context->callback, serverHeaderOptions, attrs,
626 result, sequenceNumber);
628 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
630 return OC_STACK_DELETE_TRANSACTION;
632 return OC_STACK_KEEP_TRANSACTION;
635 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
636 const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
637 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
638 ObserveCallback& callback, QualityOfService QoS)
642 return OC_STACK_INVALID_PARAM;
644 OCStackResult result;
646 ClientCallbackContext::ObserveContext* ctx =
647 new ClientCallbackContext::ObserveContext(callback);
648 OCCallbackData cbdata(
649 static_cast<void*>(ctx),
650 observeResourceCallback,
651 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
655 if (observeType == ObserveType::Observe)
657 method = OC_REST_OBSERVE;
659 else if (observeType == ObserveType::ObserveAll)
661 method = OC_REST_OBSERVE_ALL;
665 method = OC_REST_OBSERVE_ALL;
668 auto cLock = m_csdkLock.lock();
672 std::ostringstream os;
673 os << host << assembleSetResourceUri(uri, queryParams).c_str();
675 std::lock_guard<std::recursive_mutex> lock(*cLock);
676 OCHeaderOption options[MAX_HEADER_OPTIONS];
678 result = OCDoResource(handle, method,
679 os.str().c_str(), nullptr,
680 nullptr, connectivityType,
681 static_cast<OCQualityOfService>(QoS),
683 assembleHeaderOptions(options, headerOptions),
684 headerOptions.size());
689 return OC_STACK_ERROR;
695 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
696 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
697 QualityOfService QoS)
699 OCStackResult result;
700 auto cLock = m_csdkLock.lock();
704 std::lock_guard<std::recursive_mutex> lock(*cLock);
705 OCHeaderOption options[MAX_HEADER_OPTIONS];
707 result = OCCancel(handle,
708 static_cast<OCQualityOfService>(QoS),
709 assembleHeaderOptions(options, headerOptions),
710 headerOptions.size());
714 result = OC_STACK_ERROR;
720 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
721 OCClientResponse* clientResponse)
730 if(OCDevAddrToIPv4Addr(clientResponse->addr, &a, &b, &c, &d) == 0 &&
731 OCDevAddrToPort(clientResponse->addr, &port) == 0)
733 os<<static_cast<int>(a)<<"."<<static_cast<int>(b)<<"."<<static_cast<int>(c)
734 <<"."<<static_cast<int>(d)<<":"<<static_cast<int>(port);
736 ClientCallbackContext::SubscribePresenceContext* context =
737 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
739 std::thread exec(context->callback, clientResponse->result,
740 clientResponse->sequenceNumber, os.str());
746 oclog() << "subscribePresenceCallback(): OCDevAddrToIPv4Addr() or OCDevAddrToPort() "
747 <<"failed"<< std::flush;
749 return OC_STACK_KEEP_TRANSACTION;
752 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
753 const std::string& host, const std::string& resourceType,
754 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
758 return OC_STACK_INVALID_PARAM;
761 ClientCallbackContext::SubscribePresenceContext* ctx =
762 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
763 OCCallbackData cbdata(
764 static_cast<void*>(ctx),
765 subscribePresenceCallback,
767 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
770 auto cLock = m_csdkLock.lock();
772 std::ostringstream os;
773 os << host << OC_PRESENCE_URI;
775 if(!resourceType.empty())
777 os << "?rt=" << resourceType;
783 return OC_STACK_ERROR;
786 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
787 connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
790 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
792 OCStackResult result;
793 auto cLock = m_csdkLock.lock();
797 std::lock_guard<std::recursive_mutex> lock(*cLock);
798 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
802 result = OC_STACK_ERROR;
808 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
814 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
815 const HeaderOptions& headerOptions)
819 if( headerOptions.size() == 0)
824 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
826 options[i] = OCHeaderOption(OC_COAP_ID,
828 it->getOptionData().length() + 1,
829 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));
830 //options[i].protocolID = OC_COAP_ID;
831 //options[i].optionID = static_cast<uint16_t>(it->getOptionID());
832 //options[i].optionLength = (it->getOptionData()).length() + 1;
833 //memcpy(options[i].optionData, (it->getOptionData()).c_str(),
834 // (it->getOptionData()).length() + 1);