1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
105 oc.setJSONRepresentation(clientResponse->resJSONPayload);
107 catch (cereal::RapidJSONException& ex)
109 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
110 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
111 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
113 catch (cereal::Exception& ex)
115 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
116 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
117 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
120 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
121 if(it == oc.representations().end())
123 return OCRepresentation();
126 // first one is considered the root, everything else is considered a child of this one.
127 OCRepresentation root = *it;
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 auto clientWrapper = context->clientWrapper.lock();
156 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
158 return OC_STACK_KEEP_TRANSACTION;
161 std::stringstream requestStream;
162 requestStream << clientResponse->resJSONPayload;
167 ListenOCContainer container(clientWrapper, *clientResponse->addr,
168 clientResponse->connType, requestStream);
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 catch(const std::exception& e)
179 oclog() << "listenCallback failed to parse a malformed message: "
182 << clientResponse->resJSONPayload
184 << clientResponse->result
186 return OC_STACK_KEEP_TRANSACTION;
189 return OC_STACK_KEEP_TRANSACTION;
192 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
193 const std::string& resourceType, OCConnectivityType connectivityType,
194 FindCallback& callback, QualityOfService QoS)
196 OCStackResult result;
198 OCCallbackData cbdata = {0};
200 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
201 context->callback = callback;
202 context->clientWrapper = shared_from_this();
204 cbdata.context = static_cast<void*>(context);
205 cbdata.cb = listenCallback;
206 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
208 auto cLock = m_csdkLock.lock();
211 std::lock_guard<std::recursive_mutex> lock(*cLock);
213 result = OCDoResource(&handle, OC_REST_GET,
214 resourceType.c_str(),
215 nullptr, nullptr, connectivityType,
216 static_cast<OCQualityOfService>(QoS),
223 result = OC_STACK_ERROR;
228 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
229 OCClientResponse* clientResponse)
231 ClientCallbackContext::DeviceListenContext* context =
232 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
236 OCRepresentation rep = parseGetSetCallback(clientResponse);
237 std::thread exec(context->callback, rep);
240 catch(OC::OCException& e)
242 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
243 <<e.what() <<std::flush;
246 return OC_STACK_KEEP_TRANSACTION;
249 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
250 const std::string& deviceURI, OCConnectivityType connectivityType,
251 FindDeviceCallback& callback, QualityOfService QoS)
253 OCStackResult result;
255 OCCallbackData cbdata = {0};
256 ClientCallbackContext::DeviceListenContext* context =
257 new ClientCallbackContext::DeviceListenContext();
258 context->callback = callback;
259 context->clientWrapper = shared_from_this();
260 cbdata.context = static_cast<void*>(context);
261 cbdata.cb = listenDeviceCallback;
262 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
264 auto cLock = m_csdkLock.lock();
267 std::lock_guard<std::recursive_mutex> lock(*cLock);
268 result = OCDoResource(nullptr, OC_REST_GET,
270 nullptr, nullptr, connectivityType,
271 static_cast<OCQualityOfService>(QoS),
278 result = OC_STACK_ERROR;
283 void parseServerHeaderOptions(OCClientResponse* clientResponse,
284 HeaderOptions& serverHeaderOptions)
288 // Parse header options from server
290 std::string optionData;
292 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
294 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
295 optionData = reinterpret_cast<const char*>
296 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
297 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
298 serverHeaderOptions.push_back(headerOption);
303 // clientResponse is invalid
304 // TODO check proper logging
305 std::cout << " Invalid response " << std::endl;
309 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
310 OCClientResponse* clientResponse)
312 ClientCallbackContext::GetContext* context =
313 static_cast<ClientCallbackContext::GetContext*>(ctx);
315 OCRepresentation rep;
316 HeaderOptions serverHeaderOptions;
317 OCStackResult result = clientResponse->result;
318 if(result == OC_STACK_OK)
320 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
323 rep = parseGetSetCallback(clientResponse);
325 catch(OC::OCException& e)
331 std::thread exec(context->callback, serverHeaderOptions, rep, result);
333 return OC_STACK_DELETE_TRANSACTION;
336 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
337 const std::string& uri, OCConnectivityType connectivityType,
338 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
339 GetCallback& callback, QualityOfService QoS)
341 OCStackResult result;
342 OCCallbackData cbdata = {0};
344 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
345 ctx->callback = callback;
346 cbdata.context = static_cast<void*>(ctx);
347 cbdata.cb = &getResourceCallback;
348 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
350 auto cLock = m_csdkLock.lock();
354 std::ostringstream os;
355 os << host << assembleSetResourceUri(uri, queryParams).c_str();
357 std::lock_guard<std::recursive_mutex> lock(*cLock);
359 OCHeaderOption options[MAX_HEADER_OPTIONS];
361 assembleHeaderOptions(options, headerOptions);
363 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
364 nullptr, nullptr, connectivityType,
365 static_cast<OCQualityOfService>(QoS),
367 options, headerOptions.size());
372 result = OC_STACK_ERROR;
378 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
379 OCClientResponse* clientResponse)
381 ClientCallbackContext::SetContext* context =
382 static_cast<ClientCallbackContext::SetContext*>(ctx);
383 OCRepresentation attrs;
384 HeaderOptions serverHeaderOptions;
386 OCStackResult result = clientResponse->result;
387 if (OC_STACK_OK == result ||
388 OC_STACK_RESOURCE_CREATED == result ||
389 OC_STACK_RESOURCE_DELETED == result)
391 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
394 attrs = parseGetSetCallback(clientResponse);
396 catch(OC::OCException& e)
402 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
404 return OC_STACK_DELETE_TRANSACTION;
407 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
408 const QueryParamsMap& queryParams)
410 if(uri.back() == '/')
412 uri.resize(uri.size()-1);
415 ostringstream paramsList;
416 if(queryParams.size() > 0)
421 for(auto& param : queryParams)
423 paramsList << param.first <<'='<<param.second<<'&';
426 std::string queryString = paramsList.str();
427 if(queryString.back() == '&')
429 queryString.resize(queryString.size() - 1);
432 std::string ret = uri + queryString;
436 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
438 MessageContainer ocInfo;
439 ocInfo.addRepresentation(rep);
440 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
443 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
444 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
445 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
446 PostCallback& callback, QualityOfService QoS)
448 OCStackResult result;
449 OCCallbackData cbdata = {0};
451 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
452 ctx->callback = callback;
453 cbdata.cb = &setResourceCallback;
454 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
455 cbdata.context = static_cast<void*>(ctx);
457 // TODO: in the future the cstack should be combining these two strings!
459 os << host << assembleSetResourceUri(uri, queryParams).c_str();
460 // TODO: end of above
462 auto cLock = m_csdkLock.lock();
466 std::lock_guard<std::recursive_mutex> lock(*cLock);
467 OCHeaderOption options[MAX_HEADER_OPTIONS];
470 assembleHeaderOptions(options, headerOptions);
471 result = OCDoResource(&handle, OC_REST_POST,
472 os.str().c_str(), nullptr,
473 assembleSetResourcePayload(rep).c_str(), connectivityType,
474 static_cast<OCQualityOfService>(QoS),
475 &cbdata, options, headerOptions.size());
480 result = OC_STACK_ERROR;
486 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
487 const std::string& uri, OCConnectivityType connectivityType, const OCRepresentation& rep,
488 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
489 PutCallback& callback, QualityOfService QoS)
491 OCStackResult result;
492 OCCallbackData cbdata = {0};
494 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
495 ctx->callback = callback;
496 cbdata.cb = &setResourceCallback;
497 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
498 cbdata.context = static_cast<void*>(ctx);
500 // TODO: in the future the cstack should be combining these two strings!
502 os << host << assembleSetResourceUri(uri, queryParams).c_str();
503 // TODO: end of above
505 auto cLock = m_csdkLock.lock();
509 std::lock_guard<std::recursive_mutex> lock(*cLock);
511 OCHeaderOption options[MAX_HEADER_OPTIONS];
513 assembleHeaderOptions(options, headerOptions);
514 result = OCDoResource(&handle, OC_REST_PUT,
515 os.str().c_str(), nullptr,
516 assembleSetResourcePayload(rep).c_str(), connectivityType,
517 static_cast<OCQualityOfService>(QoS),
519 options, headerOptions.size());
524 result = OC_STACK_ERROR;
530 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
531 OCClientResponse* clientResponse)
533 ClientCallbackContext::DeleteContext* context =
534 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
535 HeaderOptions serverHeaderOptions;
537 if(clientResponse->result == OC_STACK_OK)
539 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
541 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
543 return OC_STACK_DELETE_TRANSACTION;
546 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
547 const std::string& uri, OCConnectivityType connectivityType,
548 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
550 OCStackResult result;
551 OCCallbackData cbdata = {0};
553 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
554 ctx->callback = callback;
555 cbdata.cb = &deleteResourceCallback;
556 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
557 cbdata.context = static_cast<void*>(ctx);
562 auto cLock = m_csdkLock.lock();
566 OCHeaderOption options[MAX_HEADER_OPTIONS];
568 assembleHeaderOptions(options, headerOptions);
570 std::lock_guard<std::recursive_mutex> lock(*cLock);
572 result = OCDoResource(nullptr, OC_REST_DELETE,
573 os.str().c_str(), nullptr,
574 nullptr, connectivityType,
575 static_cast<OCQualityOfService>(m_cfg.QoS),
576 &cbdata, options, headerOptions.size());
581 result = OC_STACK_ERROR;
587 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
588 OCClientResponse* clientResponse)
590 ClientCallbackContext::ObserveContext* context =
591 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
592 OCRepresentation attrs;
593 HeaderOptions serverHeaderOptions;
594 uint32_t sequenceNumber = clientResponse->sequenceNumber;
595 OCStackResult result = clientResponse->result;
596 if(clientResponse->result == OC_STACK_OK)
598 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
601 attrs = parseGetSetCallback(clientResponse);
603 catch(OC::OCException& e)
608 std::thread exec(context->callback, serverHeaderOptions, attrs,
609 result, sequenceNumber);
611 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
613 return OC_STACK_DELETE_TRANSACTION;
615 return OC_STACK_KEEP_TRANSACTION;
618 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
619 const std::string& host, const std::string& uri, OCConnectivityType connectivityType,
620 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
621 ObserveCallback& callback, QualityOfService QoS)
623 OCStackResult result;
624 OCCallbackData cbdata = {0};
626 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
627 ctx->callback = callback;
628 cbdata.context = static_cast<void*>(ctx);
629 cbdata.cb = &observeResourceCallback;
630 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
633 if (observeType == ObserveType::Observe)
635 method = OC_REST_OBSERVE;
637 else if (observeType == ObserveType::ObserveAll)
639 method = OC_REST_OBSERVE_ALL;
643 method = OC_REST_OBSERVE_ALL;
646 auto cLock = m_csdkLock.lock();
650 std::ostringstream os;
651 os << host << assembleSetResourceUri(uri, queryParams).c_str();
653 std::lock_guard<std::recursive_mutex> lock(*cLock);
654 OCHeaderOption options[MAX_HEADER_OPTIONS];
656 assembleHeaderOptions(options, headerOptions);
657 result = OCDoResource(handle, method,
658 os.str().c_str(), nullptr,
659 nullptr, connectivityType,
660 static_cast<OCQualityOfService>(QoS),
662 options, headerOptions.size());
667 return OC_STACK_ERROR;
673 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
674 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
675 QualityOfService QoS)
677 OCStackResult result;
678 auto cLock = m_csdkLock.lock();
682 std::lock_guard<std::recursive_mutex> lock(*cLock);
683 OCHeaderOption options[MAX_HEADER_OPTIONS];
685 assembleHeaderOptions(options, headerOptions);
686 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
687 headerOptions.size());
691 result = OC_STACK_ERROR;
697 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
698 OCClientResponse* clientResponse)
707 if(OCDevAddrToIPv4Addr(clientResponse->addr, &a, &b, &c, &d) == 0 &&
708 OCDevAddrToPort(clientResponse->addr, &port) == 0)
710 os<<static_cast<int>(a)<<"."<<static_cast<int>(b)<<"."<<static_cast<int>(c)
711 <<"."<<static_cast<int>(d)<<":"<<static_cast<int>(port);
713 ClientCallbackContext::SubscribePresenceContext* context =
714 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
716 std::thread exec(context->callback, clientResponse->result,
717 clientResponse->sequenceNumber, os.str());
723 oclog() << "subscribePresenceCallback(): OCDevAddrToIPv4Addr() or OCDevAddrToPort() "
724 <<"failed"<< std::flush;
726 return OC_STACK_KEEP_TRANSACTION;
729 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
730 const std::string& host, const std::string& resourceType,
731 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
733 OCCallbackData cbdata = {0};
735 ClientCallbackContext::SubscribePresenceContext* ctx =
736 new ClientCallbackContext::SubscribePresenceContext();
737 ctx->callback = presenceHandler;
738 cbdata.cb = &subscribePresenceCallback;
739 cbdata.context = static_cast<void*>(ctx);
740 cbdata.cd = [](void* c)
741 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
742 auto cLock = m_csdkLock.lock();
744 std::ostringstream os;
745 os << host << "/oc/presence";
747 if(!resourceType.empty())
749 os << "?rt=" << resourceType;
755 return OC_STACK_ERROR;
758 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
759 connectivityType, OC_LOW_QOS, &cbdata, NULL, 0);
762 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
764 OCStackResult result;
765 auto cLock = m_csdkLock.lock();
769 std::lock_guard<std::recursive_mutex> lock(*cLock);
770 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
774 result = OC_STACK_ERROR;
780 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
786 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
787 const HeaderOptions& headerOptions)
791 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
793 options[i].protocolID = OC_COAP_ID;
794 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
795 options[i].optionLength = (it->getOptionData()).length() + 1;
796 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
797 (it->getOptionData()).length() + 1);