1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
105 oc.setJSONRepresentation(clientResponse->resJSONPayload);
107 catch (cereal::RapidJSONException& ex)
109 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
113 catch (cereal::Exception& ex)
115 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
120 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121 if(it == oc.representations().end())
123 return OCRepresentation();
126 // first one is considered the root, everything else is considered a child of this one.
127 OCRepresentation root = *it;
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 auto clientWrapper = context->clientWrapper.lock();
156 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
158 return OC_STACK_KEEP_TRANSACTION;
161 std::stringstream requestStream;
162 requestStream << clientResponse->resJSONPayload;
167 ListenOCContainer container(clientWrapper, *clientResponse->addr,
168 clientResponse->connType, requestStream);
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 catch(const std::exception& e)
179 oclog() << "listenCallback failed to parse a malformed message: "
182 << clientResponse->resJSONPayload
184 << clientResponse->result
186 return OC_STACK_KEEP_TRANSACTION;
189 return OC_STACK_KEEP_TRANSACTION;
192 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193 const std::string& resourceType, OCConnectivityType connectivityType,
194 FindCallback& callback, QualityOfService QoS)
198 return OC_STACK_INVALID_PARAM;
201 OCStackResult result;
203 OCCallbackData cbdata = {0};
205 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
206 context->callback = callback;
207 context->clientWrapper = shared_from_this();
209 cbdata.context = static_cast<void*>(context);
210 cbdata.cb = listenCallback;
211 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
213 auto cLock = m_csdkLock.lock();
216 std::lock_guard<std::recursive_mutex> lock(*cLock);
217 result = OCDoResource(nullptr, OC_REST_GET,
218 resourceType.c_str(),
219 nullptr, nullptr, connectivityType,
220 static_cast<OCQualityOfService>(QoS),
227 result = OC_STACK_ERROR;
232 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
233 OCClientResponse* clientResponse)
235 ClientCallbackContext::DeviceListenContext* context =
236 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
240 OCRepresentation rep = parseGetSetCallback(clientResponse);
241 std::thread exec(context->callback, rep);
244 catch(OC::OCException& e)
246 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
247 <<e.what() <<std::flush;
250 return OC_STACK_KEEP_TRANSACTION;
253 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
254 const std::string& deviceURI, OCConnectivityType connectivityType,
255 FindDeviceCallback& callback, QualityOfService QoS)
259 return OC_STACK_INVALID_PARAM;
261 OCStackResult result;
263 OCCallbackData cbdata = {0};
264 ClientCallbackContext::DeviceListenContext* context =
265 new ClientCallbackContext::DeviceListenContext();
266 context->callback = callback;
267 context->clientWrapper = shared_from_this();
268 cbdata.context = static_cast<void*>(context);
269 cbdata.cb = listenDeviceCallback;
270 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
272 auto cLock = m_csdkLock.lock();
275 std::lock_guard<std::recursive_mutex> lock(*cLock);
276 result = OCDoResource(nullptr, OC_REST_GET,
278 nullptr, nullptr, connectivityType,
279 static_cast<OCQualityOfService>(QoS),
286 result = OC_STACK_ERROR;
291 void parseServerHeaderOptions(OCClientResponse* clientResponse,
292 HeaderOptions& serverHeaderOptions)
296 // Parse header options from server
298 std::string optionData;
300 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
302 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
303 optionData = reinterpret_cast<const char*>
304 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
305 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
306 serverHeaderOptions.push_back(headerOption);
311 // clientResponse is invalid
312 // TODO check proper logging
313 std::cout << " Invalid response " << std::endl;
317 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
318 OCClientResponse* clientResponse)
320 ClientCallbackContext::GetContext* context =
321 static_cast<ClientCallbackContext::GetContext*>(ctx);
323 OCRepresentation rep;
324 HeaderOptions serverHeaderOptions;
325 OCStackResult result = clientResponse->result;
326 if(result == OC_STACK_OK)
328 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
331 rep = parseGetSetCallback(clientResponse);
333 catch(OC::OCException& e)
339 std::thread exec(context->callback, serverHeaderOptions, rep, result);
341 return OC_STACK_DELETE_TRANSACTION;
344 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
345 const std::string& uri, OCConnectivityType connectivityType,
346 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
347 GetCallback& callback, QualityOfService QoS)
351 return OC_STACK_INVALID_PARAM;
353 OCStackResult result;
354 OCCallbackData cbdata = {0};
356 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
357 ctx->callback = callback;
358 cbdata.context = static_cast<void*>(ctx);
359 cbdata.cb = &getResourceCallback;
360 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
362 auto cLock = m_csdkLock.lock();
366 std::ostringstream os;
367 os << host << assembleSetResourceUri(uri, queryParams).c_str();
369 std::lock_guard<std::recursive_mutex> lock(*cLock);
370 OCHeaderOption options[MAX_HEADER_OPTIONS];
372 result = OCDoResource(nullptr, OC_REST_GET, os.str().c_str(),
373 nullptr, nullptr, connectivityType,
374 static_cast<OCQualityOfService>(QoS),
376 assembleHeaderOptions(options, headerOptions),
377 headerOptions.size());
382 result = OC_STACK_ERROR;
388 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
389 OCClientResponse* clientResponse)
391 ClientCallbackContext::SetContext* context =
392 static_cast<ClientCallbackContext::SetContext*>(ctx);
393 OCRepresentation attrs;
394 HeaderOptions serverHeaderOptions;
396 OCStackResult result = clientResponse->result;
397 if (OC_STACK_OK == result ||
398 OC_STACK_RESOURCE_CREATED == result ||
399 OC_STACK_RESOURCE_DELETED == result)
401 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
404 attrs = parseGetSetCallback(clientResponse);
406 catch(OC::OCException& e)
412 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
414 return OC_STACK_DELETE_TRANSACTION;
417 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
418 const QueryParamsMap& queryParams)
420 if(uri.back() == '/')
422 uri.resize(uri.size()-1);
425 ostringstream paramsList;
426 if(queryParams.size() > 0)
431 for(auto& param : queryParams)
433 paramsList << param.first <<'='<<param.second<<'&';
436 std::string queryString = paramsList.str();
437 if(queryString.back() == '&')
439 queryString.resize(queryString.size() - 1);
442 std::string ret = uri + queryString;
446 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
448 MessageContainer ocInfo;
449 ocInfo.addRepresentation(rep);
450 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
453 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
454 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
455 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
456 PostCallback& callback, QualityOfService QoS)
460 return OC_STACK_INVALID_PARAM;
462 OCStackResult result;
463 OCCallbackData cbdata = {0};
465 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
466 ctx->callback = callback;
467 cbdata.cb = &setResourceCallback;
468 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
469 cbdata.context = static_cast<void*>(ctx);
471 // TODO: in the future the cstack should be combining these two strings!
473 os << host << assembleSetResourceUri(uri, queryParams).c_str();
474 // TODO: end of above
476 auto cLock = m_csdkLock.lock();
480 std::lock_guard<std::recursive_mutex> lock(*cLock);
481 OCHeaderOption options[MAX_HEADER_OPTIONS];
483 result = OCDoResource(nullptr, OC_REST_POST,
484 os.str().c_str(), nullptr,
485 assembleSetResourcePayload(rep).c_str(), connectivityType,
486 static_cast<OCQualityOfService>(QoS),
488 assembleHeaderOptions(options, headerOptions),
489 headerOptions.size());
494 result = OC_STACK_ERROR;
500 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
501 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
502 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
503 PutCallback& callback, QualityOfService QoS)
507 return OC_STACK_INVALID_PARAM;
509 OCStackResult result;
510 OCCallbackData cbdata = {0};
512 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
513 ctx->callback = callback;
514 cbdata.cb = &setResourceCallback;
515 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
516 cbdata.context = static_cast<void*>(ctx);
518 // TODO: in the future the cstack should be combining these two strings!
520 os << host << assembleSetResourceUri(uri, queryParams).c_str();
521 // TODO: end of above
523 auto cLock = m_csdkLock.lock();
527 std::lock_guard<std::recursive_mutex> lock(*cLock);
529 OCHeaderOption options[MAX_HEADER_OPTIONS];
531 result = OCDoResource(&handle, OC_REST_PUT,
532 os.str().c_str(), nullptr,
533 assembleSetResourcePayload(rep).c_str(), connectivityType,
534 static_cast<OCQualityOfService>(QoS),
536 assembleHeaderOptions(options, headerOptions),
537 headerOptions.size());
542 result = OC_STACK_ERROR;
548 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
549 OCClientResponse* clientResponse)
551 ClientCallbackContext::DeleteContext* context =
552 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
553 HeaderOptions serverHeaderOptions;
555 if(clientResponse->result == OC_STACK_OK)
557 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
559 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
561 return OC_STACK_DELETE_TRANSACTION;
564 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
565 const std::string& uri, OCConnectivityType connectivityType,
566 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
570 return OC_STACK_INVALID_PARAM;
572 OCStackResult result;
573 OCCallbackData cbdata = {0};
575 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
576 ctx->callback = callback;
577 cbdata.cb = &deleteResourceCallback;
578 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
579 cbdata.context = static_cast<void*>(ctx);
584 auto cLock = m_csdkLock.lock();
588 OCHeaderOption options[MAX_HEADER_OPTIONS];
590 std::lock_guard<std::recursive_mutex> lock(*cLock);
592 result = OCDoResource(nullptr, OC_REST_DELETE,
593 os.str().c_str(), nullptr,
594 nullptr, connectivityType,
595 static_cast<OCQualityOfService>(m_cfg.QoS),
597 assembleHeaderOptions(options, headerOptions),
598 headerOptions.size());
603 result = OC_STACK_ERROR;
609 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
610 OCClientResponse* clientResponse)
612 ClientCallbackContext::ObserveContext* context =
613 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
614 OCRepresentation attrs;
615 HeaderOptions serverHeaderOptions;
616 uint32_t sequenceNumber = clientResponse->sequenceNumber;
617 OCStackResult result = clientResponse->result;
618 if(clientResponse->result == OC_STACK_OK)
620 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
623 attrs = parseGetSetCallback(clientResponse);
625 catch(OC::OCException& e)
630 std::thread exec(context->callback, serverHeaderOptions, attrs,
631 result, sequenceNumber);
633 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
635 return OC_STACK_DELETE_TRANSACTION;
637 return OC_STACK_KEEP_TRANSACTION;
640 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
641 const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
642 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
643 ObserveCallback& callback, QualityOfService QoS)
647 return OC_STACK_INVALID_PARAM;
649 OCStackResult result;
650 OCCallbackData cbdata = {0};
652 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
653 ctx->callback = callback;
654 cbdata.context = static_cast<void*>(ctx);
655 cbdata.cb = &observeResourceCallback;
656 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
659 if (observeType == ObserveType::Observe)
661 method = OC_REST_OBSERVE;
663 else if (observeType == ObserveType::ObserveAll)
665 method = OC_REST_OBSERVE_ALL;
669 method = OC_REST_OBSERVE_ALL;
672 auto cLock = m_csdkLock.lock();
676 std::ostringstream os;
677 os << host << assembleSetResourceUri(uri, queryParams).c_str();
679 std::lock_guard<std::recursive_mutex> lock(*cLock);
680 OCHeaderOption options[MAX_HEADER_OPTIONS];
682 result = OCDoResource(handle, method,
683 os.str().c_str(), nullptr,
684 nullptr, connectivityType,
685 static_cast<OCQualityOfService>(QoS),
687 assembleHeaderOptions(options, headerOptions),
688 headerOptions.size());
693 return OC_STACK_ERROR;
699 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
700 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
701 QualityOfService QoS)
703 OCStackResult result;
704 auto cLock = m_csdkLock.lock();
708 std::lock_guard<std::recursive_mutex> lock(*cLock);
709 OCHeaderOption options[MAX_HEADER_OPTIONS];
711 result = OCCancel(handle,
712 static_cast<OCQualityOfService>(QoS),
713 assembleHeaderOptions(options, headerOptions),
714 headerOptions.size());
718 result = OC_STACK_ERROR;
724 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
725 OCClientResponse* clientResponse)
734 if(OCDevAddrToIPv4Addr(clientResponse->addr, &a, &b, &c, &d) == 0 &&
735 OCDevAddrToPort(clientResponse->addr, &port) == 0)
737 os<<static_cast<int>(a)<<"."<<static_cast<int>(b)<<"."<<static_cast<int>(c)
738 <<"."<<static_cast<int>(d)<<":"<<static_cast<int>(port);
740 ClientCallbackContext::SubscribePresenceContext* context =
741 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
743 std::thread exec(context->callback, clientResponse->result,
744 clientResponse->sequenceNumber, os.str());
750 oclog() << "subscribePresenceCallback(): OCDevAddrToIPv4Addr() or OCDevAddrToPort() "
751 <<"failed"<< std::flush;
753 return OC_STACK_KEEP_TRANSACTION;
756 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
757 const std::string& host, const std::string& resourceType,
758 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
762 return OC_STACK_INVALID_PARAM;
764 OCCallbackData cbdata = {0};
766 ClientCallbackContext::SubscribePresenceContext* ctx =
767 new ClientCallbackContext::SubscribePresenceContext();
768 ctx->callback = presenceHandler;
769 cbdata.cb = &subscribePresenceCallback;
770 cbdata.context = static_cast<void*>(ctx);
771 cbdata.cd = [](void* c)
772 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
773 auto cLock = m_csdkLock.lock();
775 std::ostringstream os;
776 os << host << OC_PRESENCE_URI;
778 if(!resourceType.empty())
780 os << "?rt=" << resourceType;
786 return OC_STACK_ERROR;
789 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
790 connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
793 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
795 OCStackResult result;
796 auto cLock = m_csdkLock.lock();
800 std::lock_guard<std::recursive_mutex> lock(*cLock);
801 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
805 result = OC_STACK_ERROR;
811 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
817 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
818 const HeaderOptions& headerOptions)
822 if( headerOptions.size() == 0)
827 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
829 options[i].protocolID = OC_COAP_ID;
830 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
831 options[i].optionLength = (it->getOptionData()).length() + 1;
832 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
833 (it->getOptionData()).length() + 1);