1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
105 oc.setJSONRepresentation(clientResponse->resJSONPayload);
107 catch (cereal::RapidJSONException& ex)
109 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
113 catch (cereal::Exception& ex)
115 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
120 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121 if(it == oc.representations().end())
123 return OCRepresentation();
126 // first one is considered the root, everything else is considered a child of this one.
127 OCRepresentation root = *it;
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 auto clientWrapper = context->clientWrapper.lock();
156 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
158 return OC_STACK_KEEP_TRANSACTION;
161 std::stringstream requestStream;
162 requestStream << clientResponse->resJSONPayload;
167 ListenOCContainer container(clientWrapper, *clientResponse->addr,
168 clientResponse->connType, requestStream);
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 catch(const std::exception& e)
179 oclog() << "listenCallback failed to parse a malformed message: "
182 << clientResponse->resJSONPayload
184 << clientResponse->result
186 return OC_STACK_KEEP_TRANSACTION;
189 return OC_STACK_KEEP_TRANSACTION;
192 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193 const std::string& resourceType, OCConnectivityType connectivityType,
194 FindCallback& callback, QualityOfService QoS)
196 OCStackResult result;
198 OCCallbackData cbdata = {0};
200 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
201 context->callback = callback;
202 context->clientWrapper = shared_from_this();
204 cbdata.context = static_cast<void*>(context);
205 cbdata.cb = listenCallback;
206 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
208 auto cLock = m_csdkLock.lock();
211 std::lock_guard<std::recursive_mutex> lock(*cLock);
213 result = OCDoResource(&handle, OC_REST_GET,
214 resourceType.c_str(),
215 nullptr, nullptr, connectivityType,
216 static_cast<OCQualityOfService>(QoS),
223 result = OC_STACK_ERROR;
228 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
229 OCClientResponse* clientResponse)
231 ClientCallbackContext::DeviceListenContext* context =
232 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
236 OCRepresentation rep = parseGetSetCallback(clientResponse);
237 std::thread exec(context->callback, rep);
240 catch(OC::OCException& e)
242 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
243 <<e.what() <<std::flush;
246 return OC_STACK_KEEP_TRANSACTION;
249 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
250 const std::string& deviceURI, OCConnectivityType connectivityType,
251 FindDeviceCallback& callback, QualityOfService QoS)
253 OCStackResult result;
255 OCCallbackData cbdata = {0};
256 ClientCallbackContext::DeviceListenContext* context =
257 new ClientCallbackContext::DeviceListenContext();
258 context->callback = callback;
259 context->clientWrapper = shared_from_this();
260 cbdata.context = static_cast<void*>(context);
261 cbdata.cb = listenDeviceCallback;
262 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
264 auto cLock = m_csdkLock.lock();
267 std::lock_guard<std::recursive_mutex> lock(*cLock);
269 result = OCDoResource(&handle, OC_REST_GET,
271 nullptr, nullptr, connectivityType,
272 static_cast<OCQualityOfService>(QoS),
279 result = OC_STACK_ERROR;
284 void parseServerHeaderOptions(OCClientResponse* clientResponse,
285 HeaderOptions& serverHeaderOptions)
289 // Parse header options from server
291 std::string optionData;
293 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
295 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
296 optionData = reinterpret_cast<const char*>
297 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
298 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
299 serverHeaderOptions.push_back(headerOption);
304 // clientResponse is invalid
305 // TODO check proper logging
306 std::cout << " Invalid response " << std::endl;
310 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
311 OCClientResponse* clientResponse)
313 ClientCallbackContext::GetContext* context =
314 static_cast<ClientCallbackContext::GetContext*>(ctx);
316 OCRepresentation rep;
317 HeaderOptions serverHeaderOptions;
318 OCStackResult result = clientResponse->result;
319 if(result == OC_STACK_OK)
321 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
324 rep = parseGetSetCallback(clientResponse);
326 catch(OC::OCException& e)
332 std::thread exec(context->callback, serverHeaderOptions, rep, result);
334 return OC_STACK_DELETE_TRANSACTION;
337 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
338 const std::string& uri, OCConnectivityType connectivityType,
339 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
340 GetCallback& callback, QualityOfService QoS)
342 OCStackResult result;
343 OCCallbackData cbdata = {0};
345 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
346 ctx->callback = callback;
347 cbdata.context = static_cast<void*>(ctx);
348 cbdata.cb = &getResourceCallback;
349 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
351 auto cLock = m_csdkLock.lock();
355 std::ostringstream os;
356 os << host << assembleSetResourceUri(uri, queryParams).c_str();
358 std::lock_guard<std::recursive_mutex> lock(*cLock);
360 OCHeaderOption options[MAX_HEADER_OPTIONS];
362 assembleHeaderOptions(options, headerOptions);
364 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
365 nullptr, nullptr, connectivityType,
366 static_cast<OCQualityOfService>(QoS),
368 options, headerOptions.size());
373 result = OC_STACK_ERROR;
379 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
380 OCClientResponse* clientResponse)
382 ClientCallbackContext::SetContext* context =
383 static_cast<ClientCallbackContext::SetContext*>(ctx);
384 OCRepresentation attrs;
385 HeaderOptions serverHeaderOptions;
387 OCStackResult result = clientResponse->result;
388 if (OC_STACK_OK == result ||
389 OC_STACK_RESOURCE_CREATED == result ||
390 OC_STACK_RESOURCE_DELETED == result)
392 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
395 attrs = parseGetSetCallback(clientResponse);
397 catch(OC::OCException& e)
403 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
405 return OC_STACK_DELETE_TRANSACTION;
408 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
409 const QueryParamsMap& queryParams)
411 if(uri.back() == '/')
413 uri.resize(uri.size()-1);
416 ostringstream paramsList;
417 if(queryParams.size() > 0)
422 for(auto& param : queryParams)
424 paramsList << param.first <<'='<<param.second<<'&';
427 std::string queryString = paramsList.str();
428 if(queryString.back() == '&')
430 queryString.resize(queryString.size() - 1);
433 std::string ret = uri + queryString;
437 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
439 MessageContainer ocInfo;
440 ocInfo.addRepresentation(rep);
441 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
444 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
445 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
446 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
447 PostCallback& callback, QualityOfService QoS)
449 OCStackResult result;
450 OCCallbackData cbdata = {0};
452 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
453 ctx->callback = callback;
454 cbdata.cb = &setResourceCallback;
455 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
456 cbdata.context = static_cast<void*>(ctx);
458 // TODO: in the future the cstack should be combining these two strings!
460 os << host << assembleSetResourceUri(uri, queryParams).c_str();
461 // TODO: end of above
463 auto cLock = m_csdkLock.lock();
467 std::lock_guard<std::recursive_mutex> lock(*cLock);
468 OCHeaderOption options[MAX_HEADER_OPTIONS];
471 assembleHeaderOptions(options, headerOptions);
472 result = OCDoResource(&handle, OC_REST_POST,
473 os.str().c_str(), nullptr,
474 assembleSetResourcePayload(rep).c_str(), connectivityType,
475 static_cast<OCQualityOfService>(QoS),
476 &cbdata, options, headerOptions.size());
481 result = OC_STACK_ERROR;
487 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
488 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
489 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
490 PutCallback& callback, QualityOfService QoS)
492 OCStackResult result;
493 OCCallbackData cbdata = {0};
495 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
496 ctx->callback = callback;
497 cbdata.cb = &setResourceCallback;
498 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
499 cbdata.context = static_cast<void*>(ctx);
501 // TODO: in the future the cstack should be combining these two strings!
503 os << host << assembleSetResourceUri(uri, queryParams).c_str();
504 // TODO: end of above
506 auto cLock = m_csdkLock.lock();
510 std::lock_guard<std::recursive_mutex> lock(*cLock);
512 OCHeaderOption options[MAX_HEADER_OPTIONS];
514 assembleHeaderOptions(options, headerOptions);
515 result = OCDoResource(&handle, OC_REST_PUT,
516 os.str().c_str(), nullptr,
517 assembleSetResourcePayload(rep).c_str(), connectivityType,
518 static_cast<OCQualityOfService>(QoS),
520 options, headerOptions.size());
525 result = OC_STACK_ERROR;
531 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
532 OCClientResponse* clientResponse)
534 ClientCallbackContext::DeleteContext* context =
535 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
536 HeaderOptions serverHeaderOptions;
538 if(clientResponse->result == OC_STACK_OK)
540 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
542 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
544 return OC_STACK_DELETE_TRANSACTION;
547 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
548 const std::string& uri, OCConnectivityType connectivityType,
549 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
551 OCStackResult result;
552 OCCallbackData cbdata = {0};
554 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
555 ctx->callback = callback;
556 cbdata.cb = &deleteResourceCallback;
557 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
558 cbdata.context = static_cast<void*>(ctx);
563 auto cLock = m_csdkLock.lock();
567 OCHeaderOption options[MAX_HEADER_OPTIONS];
570 assembleHeaderOptions(options, headerOptions);
572 std::lock_guard<std::recursive_mutex> lock(*cLock);
574 result = OCDoResource(&handle, OC_REST_DELETE,
575 os.str().c_str(), nullptr,
576 nullptr, connectivityType,
577 static_cast<OCQualityOfService>(m_cfg.QoS),
578 &cbdata, options, headerOptions.size());
583 result = OC_STACK_ERROR;
589 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
590 OCClientResponse* clientResponse)
592 ClientCallbackContext::ObserveContext* context =
593 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
594 OCRepresentation attrs;
595 HeaderOptions serverHeaderOptions;
596 uint32_t sequenceNumber = clientResponse->sequenceNumber;
597 OCStackResult result = clientResponse->result;
598 if(clientResponse->result == OC_STACK_OK)
600 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
603 attrs = parseGetSetCallback(clientResponse);
605 catch(OC::OCException& e)
610 std::thread exec(context->callback, serverHeaderOptions, attrs,
611 result, sequenceNumber);
613 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
615 return OC_STACK_DELETE_TRANSACTION;
617 return OC_STACK_KEEP_TRANSACTION;
620 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
621 const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
622 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
623 ObserveCallback& callback, QualityOfService QoS)
625 OCStackResult result;
626 OCCallbackData cbdata = {0};
628 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
629 ctx->callback = callback;
630 cbdata.context = static_cast<void*>(ctx);
631 cbdata.cb = &observeResourceCallback;
632 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
635 if (observeType == ObserveType::Observe)
637 method = OC_REST_OBSERVE;
639 else if (observeType == ObserveType::ObserveAll)
641 method = OC_REST_OBSERVE_ALL;
645 method = OC_REST_OBSERVE_ALL;
648 auto cLock = m_csdkLock.lock();
652 std::ostringstream os;
653 os << host << assembleSetResourceUri(uri, queryParams).c_str();
655 std::lock_guard<std::recursive_mutex> lock(*cLock);
656 OCHeaderOption options[MAX_HEADER_OPTIONS];
658 assembleHeaderOptions(options, headerOptions);
659 result = OCDoResource(handle, method,
660 os.str().c_str(), nullptr,
661 nullptr, connectivityType,
662 static_cast<OCQualityOfService>(QoS),
664 options, headerOptions.size());
669 return OC_STACK_ERROR;
675 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
676 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
677 QualityOfService QoS)
679 OCStackResult result;
680 auto cLock = m_csdkLock.lock();
684 std::lock_guard<std::recursive_mutex> lock(*cLock);
685 OCHeaderOption options[MAX_HEADER_OPTIONS];
687 assembleHeaderOptions(options, headerOptions);
688 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
689 headerOptions.size());
693 result = OC_STACK_ERROR;
699 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
700 OCClientResponse* clientResponse)
702 char stringAddress[DEV_ADDR_SIZE_MAX];
706 if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
707 OCDevAddrToPort(clientResponse->addr, &port) == 0)
709 os<<stringAddress<<":"<<port;
711 ClientCallbackContext::SubscribePresenceContext* context =
712 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
714 std::thread exec(context->callback, clientResponse->result,
715 clientResponse->sequenceNumber, os.str());
721 oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
722 <<"failed"<< std::flush;
724 return OC_STACK_KEEP_TRANSACTION;
727 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
728 const std::string& host, const std::string& resourceType,
729 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
731 OCCallbackData cbdata = {0};
733 ClientCallbackContext::SubscribePresenceContext* ctx =
734 new ClientCallbackContext::SubscribePresenceContext();
735 ctx->callback = presenceHandler;
736 cbdata.cb = &subscribePresenceCallback;
737 cbdata.context = static_cast<void*>(ctx);
738 cbdata.cd = [](void* c)
739 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
740 auto cLock = m_csdkLock.lock();
742 std::ostringstream os;
743 os << host << "/oc/presence";
745 if(!resourceType.empty())
747 os << "?rt=" << resourceType;
753 return OC_STACK_ERROR;
756 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
757 connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
760 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
762 OCStackResult result;
763 auto cLock = m_csdkLock.lock();
767 std::lock_guard<std::recursive_mutex> lock(*cLock);
768 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
772 result = OC_STACK_ERROR;
778 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
784 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
785 const HeaderOptions& headerOptions)
789 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
791 options[i].protocolID = OC_COAP_ID;
792 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
793 options[i].optionLength = (it->getOptionData()).length() + 1;
794 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
795 (it->getOptionData()).length() + 1);