1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCTransportFlags serverFlags =
42 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43 OCTransportFlags clientFlags =
44 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47 if(OC_STACK_OK != result)
49 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
53 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
57 InProcClientWrapper::~InProcClientWrapper()
59 if(m_threadRun && m_listeningThread.joinable())
62 m_listeningThread.join();
65 // only stop if we are the ones who actually called 'init'. We are counting
66 // on the server to do the stop.
67 if(m_cfg.mode == ModeType::Client)
73 void InProcClientWrapper::listeningFunc()
78 auto cLock = m_csdkLock.lock();
81 std::lock_guard<std::recursive_mutex> lock(*cLock);
86 result = OC_STACK_ERROR;
89 if(result != OC_STACK_OK)
91 // TODO: do something with result if failed?
94 // To minimize CPU utilization we may wish to do this with sleep
95 std::this_thread::sleep_for(std::chrono::milliseconds(10));
99 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
103 return OCRepresentation();
109 oc.setJSONRepresentation(clientResponse->resJSONPayload);
111 catch (cereal::RapidJSONException& ex)
113 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
117 catch (cereal::Exception& ex)
119 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
124 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125 if(it == oc.representations().end())
127 return OCRepresentation();
130 // first one is considered the root, everything else is considered a child of this one.
131 OCRepresentation root = *it;
134 std::for_each(it, oc.representations().end(),
135 [&root](const OCRepresentation& repItr)
136 {root.addChild(repItr);});
141 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142 OCClientResponse* clientResponse)
144 ClientCallbackContext::ListenContext* context =
145 static_cast<ClientCallbackContext::ListenContext*>(ctx);
147 if(clientResponse->result != OC_STACK_OK)
149 oclog() << "listenCallback(): failed to create resource. clientResponse: "
150 << clientResponse->result
153 return OC_STACK_KEEP_TRANSACTION;
156 auto clientWrapper = context->clientWrapper.lock();
160 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
162 return OC_STACK_KEEP_TRANSACTION;
165 std::stringstream requestStream;
166 requestStream << clientResponse->resJSONPayload;
171 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
173 // loop to ensure valid construction of all resources
174 for(auto resource : container.Resources())
176 std::thread exec(context->callback, resource);
181 catch(const std::exception& e)
183 oclog() << "listenCallback failed to parse a malformed message: "
186 << clientResponse->resJSONPayload
188 << clientResponse->result
190 return OC_STACK_KEEP_TRANSACTION;
193 return OC_STACK_KEEP_TRANSACTION;
196 OCStackResult InProcClientWrapper::ListenForResource(
197 const std::string& serviceUrl, // unused
198 const std::string& resourceType,
199 OCConnectivityType connectivityType,
200 FindCallback& callback, QualityOfService QoS)
204 return OC_STACK_INVALID_PARAM;
207 OCStackResult result;
209 ClientCallbackContext::ListenContext* context =
210 new ClientCallbackContext::ListenContext(callback, shared_from_this());
211 OCCallbackData cbdata(
212 static_cast<void*>(context),
214 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
217 auto cLock = m_csdkLock.lock();
220 std::lock_guard<std::recursive_mutex> lock(*cLock);
221 result = OCDoResource(nullptr, OC_REST_DISCOVER,
222 resourceType.c_str(),
223 nullptr, nullptr, connectivityType,
224 static_cast<OCQualityOfService>(QoS),
231 result = OC_STACK_ERROR;
236 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237 OCClientResponse* clientResponse)
239 ClientCallbackContext::DeviceListenContext* context =
240 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
244 OCRepresentation rep = parseGetSetCallback(clientResponse);
245 std::thread exec(context->callback, rep);
248 catch(OC::OCException& e)
250 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251 <<e.what() <<std::flush;
254 return OC_STACK_KEEP_TRANSACTION;
257 OCStackResult InProcClientWrapper::ListenForDevice(
258 const std::string& serviceUrl, // unused
259 const std::string& deviceURI,
260 OCConnectivityType connectivityType,
261 FindDeviceCallback& callback,
262 QualityOfService QoS)
266 return OC_STACK_INVALID_PARAM;
268 OCStackResult result;
270 ClientCallbackContext::DeviceListenContext* context =
271 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272 OCCallbackData cbdata(
273 static_cast<void*>(context),
274 listenDeviceCallback,
275 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
278 auto cLock = m_csdkLock.lock();
281 std::lock_guard<std::recursive_mutex> lock(*cLock);
282 result = OCDoResource(nullptr, OC_REST_DISCOVER,
284 nullptr, nullptr, connectivityType,
285 static_cast<OCQualityOfService>(QoS),
292 result = OC_STACK_ERROR;
297 void parseServerHeaderOptions(OCClientResponse* clientResponse,
298 HeaderOptions& serverHeaderOptions)
302 // Parse header options from server
304 std::string optionData;
306 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
308 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309 optionData = reinterpret_cast<const char*>
310 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312 serverHeaderOptions.push_back(headerOption);
317 // clientResponse is invalid
318 // TODO check proper logging
319 std::cout << " Invalid response " << std::endl;
323 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324 OCClientResponse* clientResponse)
326 ClientCallbackContext::GetContext* context =
327 static_cast<ClientCallbackContext::GetContext*>(ctx);
329 OCRepresentation rep;
330 HeaderOptions serverHeaderOptions;
331 OCStackResult result = clientResponse->result;
332 if(result == OC_STACK_OK)
334 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
337 rep = parseGetSetCallback(clientResponse);
339 catch(OC::OCException& e)
345 std::thread exec(context->callback, serverHeaderOptions, rep, result);
347 return OC_STACK_DELETE_TRANSACTION;
350 OCStackResult InProcClientWrapper::GetResourceRepresentation(
351 const OCDevAddr& devAddr,
352 const std::string& resourceUri,
353 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
354 GetCallback& callback, QualityOfService QoS)
358 return OC_STACK_INVALID_PARAM;
360 OCStackResult result;
361 ClientCallbackContext::GetContext* ctx =
362 new ClientCallbackContext::GetContext(callback);
363 OCCallbackData cbdata(
364 static_cast<void*>(ctx),
366 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
369 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
371 auto cLock = m_csdkLock.lock();
375 std::lock_guard<std::recursive_mutex> lock(*cLock);
376 OCHeaderOption options[MAX_HEADER_OPTIONS];
378 result = OCDoResource(
379 nullptr, OC_REST_GET,
383 static_cast<OCQualityOfService>(QoS),
385 assembleHeaderOptions(options, headerOptions),
386 headerOptions.size());
391 result = OC_STACK_ERROR;
397 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
398 OCClientResponse* clientResponse)
400 ClientCallbackContext::SetContext* context =
401 static_cast<ClientCallbackContext::SetContext*>(ctx);
402 OCRepresentation attrs;
403 HeaderOptions serverHeaderOptions;
405 OCStackResult result = clientResponse->result;
406 if (OC_STACK_OK == result ||
407 OC_STACK_RESOURCE_CREATED == result ||
408 OC_STACK_RESOURCE_DELETED == result)
410 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
413 attrs = parseGetSetCallback(clientResponse);
415 catch(OC::OCException& e)
421 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
423 return OC_STACK_DELETE_TRANSACTION;
426 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
427 const QueryParamsMap& queryParams)
429 if(uri.back() == '/')
431 uri.resize(uri.size()-1);
434 ostringstream paramsList;
435 if(queryParams.size() > 0)
440 for(auto& param : queryParams)
442 paramsList << param.first <<'='<<param.second<<';';
445 std::string queryString = paramsList.str();
446 if(queryString.back() == ';')
448 queryString.resize(queryString.size() - 1);
451 std::string ret = uri + queryString;
455 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
457 MessageContainer ocInfo;
458 ocInfo.addRepresentation(rep);
459 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
462 OCStackResult InProcClientWrapper::PostResourceRepresentation(
463 const OCDevAddr& devAddr,
464 const std::string& uri,
465 const OCRepresentation& rep,
466 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
467 PostCallback& callback, QualityOfService QoS)
471 return OC_STACK_INVALID_PARAM;
473 OCStackResult result;
474 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
475 OCCallbackData cbdata(
476 static_cast<void*>(ctx),
478 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
481 std::string url = assembleSetResourceUri(uri, queryParams);
483 auto cLock = m_csdkLock.lock();
487 std::lock_guard<std::recursive_mutex> lock(*cLock);
488 OCHeaderOption options[MAX_HEADER_OPTIONS];
490 result = OCDoResource(nullptr, OC_REST_POST,
491 url.c_str(), &devAddr,
492 assembleSetResourcePayload(rep).c_str(),
494 static_cast<OCQualityOfService>(QoS),
496 assembleHeaderOptions(options, headerOptions),
497 headerOptions.size());
502 result = OC_STACK_ERROR;
508 OCStackResult InProcClientWrapper::PutResourceRepresentation(
509 const OCDevAddr& devAddr,
510 const std::string& uri,
511 const OCRepresentation& rep,
512 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
513 PutCallback& callback, QualityOfService QoS)
517 return OC_STACK_INVALID_PARAM;
519 OCStackResult result;
520 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
521 OCCallbackData cbdata(
522 static_cast<void*>(ctx),
524 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
527 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
529 auto cLock = m_csdkLock.lock();
533 std::lock_guard<std::recursive_mutex> lock(*cLock);
535 OCHeaderOption options[MAX_HEADER_OPTIONS];
537 result = OCDoResource(&handle, OC_REST_PUT,
538 url.c_str(), &devAddr,
539 assembleSetResourcePayload(rep).c_str(),
541 static_cast<OCQualityOfService>(QoS),
543 assembleHeaderOptions(options, headerOptions),
544 headerOptions.size());
549 result = OC_STACK_ERROR;
555 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
556 OCClientResponse* clientResponse)
558 ClientCallbackContext::DeleteContext* context =
559 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
560 HeaderOptions serverHeaderOptions;
562 if(clientResponse->result == OC_STACK_OK)
564 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
566 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
568 return OC_STACK_DELETE_TRANSACTION;
571 OCStackResult InProcClientWrapper::DeleteResource(
572 const OCDevAddr& devAddr,
573 const std::string& uri,
574 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
578 return OC_STACK_INVALID_PARAM;
580 OCStackResult result;
581 ClientCallbackContext::DeleteContext* ctx =
582 new ClientCallbackContext::DeleteContext(callback);
583 OCCallbackData cbdata(
584 static_cast<void*>(ctx),
585 deleteResourceCallback,
586 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
589 auto cLock = m_csdkLock.lock();
593 OCHeaderOption options[MAX_HEADER_OPTIONS];
595 std::lock_guard<std::recursive_mutex> lock(*cLock);
597 result = OCDoResource(nullptr, OC_REST_DELETE,
598 uri.c_str(), &devAddr,
601 static_cast<OCQualityOfService>(m_cfg.QoS),
603 assembleHeaderOptions(options, headerOptions),
604 headerOptions.size());
609 result = OC_STACK_ERROR;
615 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
616 OCClientResponse* clientResponse)
618 ClientCallbackContext::ObserveContext* context =
619 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
620 OCRepresentation attrs;
621 HeaderOptions serverHeaderOptions;
622 uint32_t sequenceNumber = clientResponse->sequenceNumber;
623 OCStackResult result = clientResponse->result;
624 if(clientResponse->result == OC_STACK_OK)
626 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
629 attrs = parseGetSetCallback(clientResponse);
631 catch(OC::OCException& e)
636 std::thread exec(context->callback, serverHeaderOptions, attrs,
637 result, sequenceNumber);
639 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
641 return OC_STACK_DELETE_TRANSACTION;
643 return OC_STACK_KEEP_TRANSACTION;
646 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
647 const OCDevAddr& devAddr,
648 const std::string& uri,
649 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
650 ObserveCallback& callback, QualityOfService QoS)
654 return OC_STACK_INVALID_PARAM;
656 OCStackResult result;
658 ClientCallbackContext::ObserveContext* ctx =
659 new ClientCallbackContext::ObserveContext(callback);
660 OCCallbackData cbdata(
661 static_cast<void*>(ctx),
662 observeResourceCallback,
663 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
667 if (observeType == ObserveType::Observe)
669 method = OC_REST_OBSERVE;
671 else if (observeType == ObserveType::ObserveAll)
673 method = OC_REST_OBSERVE_ALL;
677 method = OC_REST_OBSERVE_ALL;
680 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
682 auto cLock = m_csdkLock.lock();
686 std::lock_guard<std::recursive_mutex> lock(*cLock);
687 OCHeaderOption options[MAX_HEADER_OPTIONS];
689 result = OCDoResource(handle, method,
690 url.c_str(), &devAddr,
693 static_cast<OCQualityOfService>(QoS),
695 assembleHeaderOptions(options, headerOptions),
696 headerOptions.size());
701 return OC_STACK_ERROR;
707 OCStackResult InProcClientWrapper::CancelObserveResource(
709 const std::string& host, // unused
710 const std::string& uri, // unused
711 const HeaderOptions& headerOptions,
712 QualityOfService QoS)
714 OCStackResult result;
715 auto cLock = m_csdkLock.lock();
719 std::lock_guard<std::recursive_mutex> lock(*cLock);
720 OCHeaderOption options[MAX_HEADER_OPTIONS];
722 result = OCCancel(handle,
723 static_cast<OCQualityOfService>(QoS),
724 assembleHeaderOptions(options, headerOptions),
725 headerOptions.size());
729 result = OC_STACK_ERROR;
735 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
736 OCClientResponse* clientResponse)
738 ClientCallbackContext::SubscribePresenceContext* context =
739 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
742 * This a hack while we rethink presence subscription.
744 std::string url = clientResponse->devAddr.addr;
746 std::thread exec(context->callback, clientResponse->result,
747 clientResponse->sequenceNumber, url);
751 return OC_STACK_KEEP_TRANSACTION;
754 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
755 const std::string& host, const std::string& resourceType,
756 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
760 return OC_STACK_INVALID_PARAM;
763 ClientCallbackContext::SubscribePresenceContext* ctx =
764 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
765 OCCallbackData cbdata(
766 static_cast<void*>(ctx),
767 subscribePresenceCallback,
769 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
772 auto cLock = m_csdkLock.lock();
774 std::ostringstream os;
775 os << host << OC_PRESENCE_URI;;
777 if(!resourceType.empty())
779 os << "?rt=" << resourceType;
785 return OC_STACK_ERROR;
788 return OCDoResource(handle, OC_REST_PRESENCE,
789 os.str().c_str(), nullptr,
790 nullptr, connectivityType,
791 OC_LOW_QOS, &cbdata, NULL, 0);
794 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
796 OCStackResult result;
797 auto cLock = m_csdkLock.lock();
801 std::lock_guard<std::recursive_mutex> lock(*cLock);
802 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
806 result = OC_STACK_ERROR;
812 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
818 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
819 const HeaderOptions& headerOptions)
823 if( headerOptions.size() == 0)
828 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
830 options[i] = OCHeaderOption(OC_COAP_ID,
832 it->getOptionData().length() + 1,
833 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));