1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
105 oc.setJSONRepresentation(clientResponse->resJSONPayload);
107 catch (cereal::RapidJSONException& ex)
109 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
113 catch (cereal::Exception& ex)
115 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
120 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121 if(it == oc.representations().end())
123 return OCRepresentation();
126 // first one is considered the root, everything else is considered a child of this one.
127 OCRepresentation root = *it;
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 auto clientWrapper = context->clientWrapper.lock();
156 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
158 return OC_STACK_KEEP_TRANSACTION;
161 std::stringstream requestStream;
162 requestStream << clientResponse->resJSONPayload;
166 ListenOCContainer container(clientWrapper, *clientResponse->addr,
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 catch(const std::exception& e)
179 oclog() << "listenCallback failed to parse a malformed message: "
181 << std::endl <<std::endl
182 << clientResponse->result
184 return OC_STACK_KEEP_TRANSACTION;
187 return OC_STACK_KEEP_TRANSACTION;
190 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
191 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
193 OCStackResult result;
195 OCCallbackData cbdata = {0};
197 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
198 context->callback = callback;
199 context->clientWrapper = shared_from_this();
201 cbdata.context = static_cast<void*>(context);
202 cbdata.cb = listenCallback;
203 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
205 auto cLock = m_csdkLock.lock();
208 std::lock_guard<std::recursive_mutex> lock(*cLock);
210 result = OCDoResource(&handle, OC_REST_GET,
211 resourceType.c_str(),
213 static_cast<OCQualityOfService>(QoS),
220 result = OC_STACK_ERROR;
225 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
226 OCClientResponse* clientResponse)
228 ClientCallbackContext::DeviceListenContext* context =
229 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
233 OCRepresentation rep = parseGetSetCallback(clientResponse);
234 std::thread exec(context->callback, rep);
237 catch(OC::OCException& e)
239 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
240 <<e.what() <<std::flush;
243 return OC_STACK_KEEP_TRANSACTION;
246 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
247 const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
249 OCStackResult result;
251 OCCallbackData cbdata = {0};
253 ClientCallbackContext::DeviceListenContext* context =
254 new ClientCallbackContext::DeviceListenContext();
255 context->callback = callback;
256 context->clientWrapper = shared_from_this();
258 cbdata.context = static_cast<void*>(context);
259 cbdata.cb = listenDeviceCallback;
260 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
262 auto cLock = m_csdkLock.lock();
265 std::lock_guard<std::recursive_mutex> lock(*cLock);
267 result = OCDoResource(&handle, OC_REST_GET,
270 static_cast<OCQualityOfService>(QoS),
276 result = OC_STACK_ERROR;
281 void parseServerHeaderOptions(OCClientResponse* clientResponse,
282 HeaderOptions& serverHeaderOptions)
286 // Parse header options from server
288 std::string optionData;
290 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
292 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
293 optionData = reinterpret_cast<const char*>
294 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
295 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
296 serverHeaderOptions.push_back(headerOption);
301 // clientResponse is invalid
302 // TODO check proper logging
303 std::cout << " Invalid response " << std::endl;
307 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
308 OCClientResponse* clientResponse)
310 ClientCallbackContext::GetContext* context =
311 static_cast<ClientCallbackContext::GetContext*>(ctx);
313 OCRepresentation rep;
314 HeaderOptions serverHeaderOptions;
315 OCStackResult result = clientResponse->result;
316 if(result == OC_STACK_OK)
318 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
321 rep = parseGetSetCallback(clientResponse);
323 catch(OC::OCException& e)
329 std::thread exec(context->callback, serverHeaderOptions, rep, result);
331 return OC_STACK_DELETE_TRANSACTION;
334 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
335 const std::string& uri, const QueryParamsMap& queryParams,
336 const HeaderOptions& headerOptions, GetCallback& callback,
337 QualityOfService QoS)
339 OCStackResult result;
340 OCCallbackData cbdata = {0};
342 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
343 ctx->callback = callback;
344 cbdata.context = static_cast<void*>(ctx);
345 cbdata.cb = &getResourceCallback;
346 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
348 auto cLock = m_csdkLock.lock();
352 std::ostringstream os;
353 os << host << assembleSetResourceUri(uri, queryParams).c_str();
355 std::lock_guard<std::recursive_mutex> lock(*cLock);
357 OCHeaderOption options[MAX_HEADER_OPTIONS];
359 assembleHeaderOptions(options, headerOptions);
360 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
362 static_cast<OCQualityOfService>(QoS),
364 options, headerOptions.size());
369 result = OC_STACK_ERROR;
375 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
376 OCClientResponse* clientResponse)
378 ClientCallbackContext::SetContext* context =
379 static_cast<ClientCallbackContext::SetContext*>(ctx);
380 OCRepresentation attrs;
381 HeaderOptions serverHeaderOptions;
383 OCStackResult result = clientResponse->result;
384 if (OC_STACK_OK == result ||
385 OC_STACK_RESOURCE_CREATED == result ||
386 OC_STACK_RESOURCE_DELETED == result)
388 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
391 attrs = parseGetSetCallback(clientResponse);
393 catch(OC::OCException& e)
399 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
401 return OC_STACK_DELETE_TRANSACTION;
404 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
405 const QueryParamsMap& queryParams)
407 if(uri.back() == '/')
409 uri.resize(uri.size()-1);
412 ostringstream paramsList;
413 if(queryParams.size() > 0)
418 for(auto& param : queryParams)
420 paramsList << param.first <<'='<<param.second<<'&';
423 std::string queryString = paramsList.str();
424 if(queryString.back() == '&')
426 queryString.resize(queryString.size() - 1);
429 std::string ret = uri + queryString;
433 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
435 MessageContainer ocInfo;
436 ocInfo.addRepresentation(rep);
437 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
440 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
441 const std::string& uri, const OCRepresentation& rep,
442 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
443 PostCallback& callback, QualityOfService QoS)
445 OCStackResult result;
446 OCCallbackData cbdata = {0};
448 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
449 ctx->callback = callback;
450 cbdata.cb = &setResourceCallback;
451 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
452 cbdata.context = static_cast<void*>(ctx);
454 // TODO: in the future the cstack should be combining these two strings!
456 os << host << assembleSetResourceUri(uri, queryParams).c_str();
457 // TODO: end of above
459 auto cLock = m_csdkLock.lock();
463 std::lock_guard<std::recursive_mutex> lock(*cLock);
464 OCHeaderOption options[MAX_HEADER_OPTIONS];
467 assembleHeaderOptions(options, headerOptions);
468 result = OCDoResource(&handle, OC_REST_POST,
469 os.str().c_str(), nullptr,
470 assembleSetResourcePayload(rep).c_str(),
471 static_cast<OCQualityOfService>(QoS),
472 &cbdata, options, headerOptions.size());
477 result = OC_STACK_ERROR;
484 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
485 const std::string& uri, const OCRepresentation& rep,
486 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
487 PutCallback& callback, QualityOfService QoS)
489 OCStackResult result;
490 OCCallbackData cbdata = {0};
492 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
493 ctx->callback = callback;
494 cbdata.cb = &setResourceCallback;
495 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
496 cbdata.context = static_cast<void*>(ctx);
498 // TODO: in the future the cstack should be combining these two strings!
500 os << host << assembleSetResourceUri(uri, queryParams).c_str();
501 // TODO: end of above
503 auto cLock = m_csdkLock.lock();
507 std::lock_guard<std::recursive_mutex> lock(*cLock);
509 OCHeaderOption options[MAX_HEADER_OPTIONS];
511 assembleHeaderOptions(options, headerOptions);
512 result = OCDoResource(&handle, OC_REST_PUT,
513 os.str().c_str(), nullptr,
514 assembleSetResourcePayload(rep).c_str(),
515 static_cast<OCQualityOfService>(QoS),
517 options, headerOptions.size());
522 result = OC_STACK_ERROR;
528 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
529 OCClientResponse* clientResponse)
531 ClientCallbackContext::DeleteContext* context =
532 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
533 HeaderOptions serverHeaderOptions;
535 if(clientResponse->result == OC_STACK_OK)
537 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
539 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
541 return OC_STACK_DELETE_TRANSACTION;
544 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
545 const std::string& uri, const HeaderOptions& headerOptions,
546 DeleteCallback& callback, QualityOfService QoS)
548 OCStackResult result;
549 OCCallbackData cbdata = {0};
551 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
552 ctx->callback = callback;
553 cbdata.cb = &deleteResourceCallback;
554 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
555 cbdata.context = static_cast<void*>(ctx);
560 auto cLock = m_csdkLock.lock();
564 OCHeaderOption options[MAX_HEADER_OPTIONS];
567 assembleHeaderOptions(options, headerOptions);
569 std::lock_guard<std::recursive_mutex> lock(*cLock);
571 result = OCDoResource(&handle, OC_REST_DELETE,
572 os.str().c_str(), nullptr,
573 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
574 &cbdata, options, headerOptions.size());
579 result = OC_STACK_ERROR;
585 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
586 OCClientResponse* clientResponse)
588 ClientCallbackContext::ObserveContext* context =
589 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
590 OCRepresentation attrs;
591 HeaderOptions serverHeaderOptions;
592 uint32_t sequenceNumber = clientResponse->sequenceNumber;
593 OCStackResult result = clientResponse->result;
594 if(clientResponse->result == OC_STACK_OK)
596 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
599 attrs = parseGetSetCallback(clientResponse);
601 catch(OC::OCException& e)
606 std::thread exec(context->callback, serverHeaderOptions, attrs,
607 result, sequenceNumber);
609 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
611 return OC_STACK_DELETE_TRANSACTION;
613 return OC_STACK_KEEP_TRANSACTION;
616 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
617 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
618 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
620 OCStackResult result;
621 OCCallbackData cbdata = {0};
623 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
624 ctx->callback = callback;
625 cbdata.context = static_cast<void*>(ctx);
626 cbdata.cb = &observeResourceCallback;
627 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
630 if (observeType == ObserveType::Observe)
632 method = OC_REST_OBSERVE;
634 else if (observeType == ObserveType::ObserveAll)
636 method = OC_REST_OBSERVE_ALL;
640 method = OC_REST_OBSERVE_ALL;
643 auto cLock = m_csdkLock.lock();
647 std::ostringstream os;
648 os << host << assembleSetResourceUri(uri, queryParams).c_str();
650 std::lock_guard<std::recursive_mutex> lock(*cLock);
651 OCHeaderOption options[MAX_HEADER_OPTIONS];
653 assembleHeaderOptions(options, headerOptions);
654 result = OCDoResource(handle, method,
655 os.str().c_str(), nullptr,
657 static_cast<OCQualityOfService>(QoS),
659 options, headerOptions.size());
664 return OC_STACK_ERROR;
670 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
671 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
672 QualityOfService QoS)
674 OCStackResult result;
675 auto cLock = m_csdkLock.lock();
679 std::lock_guard<std::recursive_mutex> lock(*cLock);
680 OCHeaderOption options[MAX_HEADER_OPTIONS];
682 assembleHeaderOptions(options, headerOptions);
683 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
684 headerOptions.size());
688 result = OC_STACK_ERROR;
694 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
695 OCClientResponse* clientResponse)
697 char stringAddress[DEV_ADDR_SIZE_MAX];
701 if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
702 OCDevAddrToPort(clientResponse->addr, &port) == 0)
704 os<<stringAddress<<":"<<port;
706 ClientCallbackContext::SubscribePresenceContext* context =
707 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
709 std::thread exec(context->callback, clientResponse->result,
710 clientResponse->sequenceNumber, os.str());
716 oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
717 <<"failed"<< std::flush;
719 return OC_STACK_KEEP_TRANSACTION;
722 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
723 const std::string& host, const std::string& resourceType,
724 SubscribeCallback& presenceHandler)
726 OCCallbackData cbdata = {0};
728 ClientCallbackContext::SubscribePresenceContext* ctx =
729 new ClientCallbackContext::SubscribePresenceContext();
730 ctx->callback = presenceHandler;
731 cbdata.cb = &subscribePresenceCallback;
732 cbdata.context = static_cast<void*>(ctx);
733 cbdata.cd = [](void* c)
734 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
735 auto cLock = m_csdkLock.lock();
737 std::ostringstream os;
738 os << host << "/oc/presence";
740 if(!resourceType.empty())
742 os << "?rt=" << resourceType;
748 return OC_STACK_ERROR;
751 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
752 OC_LOW_QOS, &cbdata, NULL, 0);
755 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
757 OCStackResult result;
758 auto cLock = m_csdkLock.lock();
762 std::lock_guard<std::recursive_mutex> lock(*cLock);
763 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
767 result = OC_STACK_ERROR;
773 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
779 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
780 const HeaderOptions& headerOptions)
784 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
786 options[i].protocolID = OC_COAP_ID;
787 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
788 options[i].optionLength = (it->getOptionData()).length() + 1;
789 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
790 (it->getOptionData()).length() + 1);