1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCTransportFlags serverFlags =
42 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43 OCTransportFlags clientFlags =
44 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47 if(OC_STACK_OK != result)
49 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
53 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
57 InProcClientWrapper::~InProcClientWrapper()
59 if(m_threadRun && m_listeningThread.joinable())
62 m_listeningThread.join();
65 // only stop if we are the ones who actually called 'init'. We are counting
66 // on the server to do the stop.
67 if(m_cfg.mode == ModeType::Client)
73 void InProcClientWrapper::listeningFunc()
78 auto cLock = m_csdkLock.lock();
81 std::lock_guard<std::recursive_mutex> lock(*cLock);
86 result = OC_STACK_ERROR;
89 if(result != OC_STACK_OK)
91 // TODO: do something with result if failed?
94 // To minimize CPU utilization we may wish to do this with sleep
95 std::this_thread::sleep_for(std::chrono::milliseconds(10));
99 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
103 return OCRepresentation();
109 oc.setJSONRepresentation(clientResponse->resJSONPayload);
111 catch (cereal::RapidJSONException& ex)
113 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
117 catch (cereal::Exception& ex)
119 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
124 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125 if(it == oc.representations().end())
127 return OCRepresentation();
130 // first one is considered the root, everything else is considered a child of this one.
131 OCRepresentation root = *it;
134 std::for_each(it, oc.representations().end(),
135 [&root](const OCRepresentation& repItr)
136 {root.addChild(repItr);});
141 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142 OCClientResponse* clientResponse)
144 ClientCallbackContext::ListenContext* context =
145 static_cast<ClientCallbackContext::ListenContext*>(ctx);
147 if(clientResponse->result != OC_STACK_OK)
149 oclog() << "listenCallback(): failed to create resource. clientResponse: "
150 << clientResponse->result
153 return OC_STACK_KEEP_TRANSACTION;
156 auto clientWrapper = context->clientWrapper.lock();
160 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
162 return OC_STACK_KEEP_TRANSACTION;
165 std::stringstream requestStream;
166 requestStream << clientResponse->resJSONPayload;
171 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
173 // loop to ensure valid construction of all resources
174 for(auto resource : container.Resources())
176 std::thread exec(context->callback, resource);
181 catch(const std::exception& e)
183 oclog() << "listenCallback failed to parse a malformed message: "
186 << clientResponse->resJSONPayload
188 << clientResponse->result
190 return OC_STACK_KEEP_TRANSACTION;
193 return OC_STACK_KEEP_TRANSACTION;
196 OCStackResult InProcClientWrapper::ListenForResource(
197 const std::string& serviceUrl, // unused
198 const std::string& resourceType,
199 OCConnectivityType connectivityType,
200 FindCallback& callback, QualityOfService QoS)
204 return OC_STACK_INVALID_PARAM;
207 OCStackResult result;
209 ClientCallbackContext::ListenContext* context =
210 new ClientCallbackContext::ListenContext(callback, shared_from_this());
211 OCCallbackData cbdata(
212 static_cast<void*>(context),
214 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
217 auto cLock = m_csdkLock.lock();
220 std::lock_guard<std::recursive_mutex> lock(*cLock);
221 result = OCDoResource(nullptr, OC_REST_DISCOVER,
222 resourceType.c_str(),
223 nullptr, nullptr, connectivityType,
224 static_cast<OCQualityOfService>(QoS),
231 result = OC_STACK_ERROR;
236 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237 OCClientResponse* clientResponse)
239 ClientCallbackContext::DeviceListenContext* context =
240 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
244 OCRepresentation rep = parseGetSetCallback(clientResponse);
245 std::thread exec(context->callback, rep);
248 catch(OC::OCException& e)
250 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251 <<e.what() <<std::flush;
254 return OC_STACK_KEEP_TRANSACTION;
257 OCStackResult InProcClientWrapper::ListenForDevice(
258 const std::string& serviceUrl, // unused
259 const std::string& deviceURI,
260 OCConnectivityType connectivityType,
261 FindDeviceCallback& callback,
262 QualityOfService QoS)
266 return OC_STACK_INVALID_PARAM;
268 OCStackResult result;
270 ClientCallbackContext::DeviceListenContext* context =
271 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272 OCCallbackData cbdata(
273 static_cast<void*>(context),
274 listenDeviceCallback,
275 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
278 auto cLock = m_csdkLock.lock();
281 std::lock_guard<std::recursive_mutex> lock(*cLock);
282 result = OCDoResource(nullptr, OC_REST_DISCOVER,
284 nullptr, nullptr, connectivityType,
285 static_cast<OCQualityOfService>(QoS),
292 result = OC_STACK_ERROR;
297 void parseServerHeaderOptions(OCClientResponse* clientResponse,
298 HeaderOptions& serverHeaderOptions)
302 // Parse header options from server
304 std::string optionData;
306 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
308 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309 optionData = reinterpret_cast<const char*>
310 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312 serverHeaderOptions.push_back(headerOption);
317 // clientResponse is invalid
318 // TODO check proper logging
319 std::cout << " Invalid response " << std::endl;
323 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324 OCClientResponse* clientResponse)
326 ClientCallbackContext::GetContext* context =
327 static_cast<ClientCallbackContext::GetContext*>(ctx);
329 OCRepresentation rep;
330 HeaderOptions serverHeaderOptions;
331 OCStackResult result = clientResponse->result;
332 if(result == OC_STACK_OK)
334 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
337 rep = parseGetSetCallback(clientResponse);
339 catch(OC::OCException& e)
345 std::thread exec(context->callback, serverHeaderOptions, rep, result);
347 return OC_STACK_DELETE_TRANSACTION;
350 OCStackResult InvokeDoResource(OCDoHandle *handle,
352 const char *requestUri,
353 const OCDevAddr *destination,
355 OCConnectivityType connectivityType,
356 OCQualityOfService qos,
357 OCCallbackData *cbData,
358 OCHeaderOption *options,
365 host << destination << requestUri;
366 connectivityType = (OCConnectivityType)
367 ((destination->adapter << CT_ADAPTER_SHIFT)
368 | (destination->flags & CT_MASK_FLAGS));
369 return OCDoResource(handle,
382 return OCDoResource(handle,
395 OCStackResult InProcClientWrapper::GetResourceRepresentation(
396 const OCDevAddr& devAddr, bool useHostString,
397 const std::string& resourceUri,
398 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
399 GetCallback& callback, QualityOfService QoS)
403 return OC_STACK_INVALID_PARAM;
405 OCStackResult result;
406 ClientCallbackContext::GetContext* ctx =
407 new ClientCallbackContext::GetContext(callback);
408 OCCallbackData cbdata(
409 static_cast<void*>(ctx),
411 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
414 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
416 auto cLock = m_csdkLock.lock();
420 std::lock_guard<std::recursive_mutex> lock(*cLock);
421 OCHeaderOption options[MAX_HEADER_OPTIONS];
423 result = OCDoResource(nullptr, OC_REST_GET,
427 static_cast<OCQualityOfService>(QoS),
429 assembleHeaderOptions(options, headerOptions),
430 headerOptions.size());
435 result = OC_STACK_ERROR;
441 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
442 OCClientResponse* clientResponse)
444 ClientCallbackContext::SetContext* context =
445 static_cast<ClientCallbackContext::SetContext*>(ctx);
446 OCRepresentation attrs;
447 HeaderOptions serverHeaderOptions;
449 OCStackResult result = clientResponse->result;
450 if (OC_STACK_OK == result ||
451 OC_STACK_RESOURCE_CREATED == result ||
452 OC_STACK_RESOURCE_DELETED == result)
454 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
457 attrs = parseGetSetCallback(clientResponse);
459 catch(OC::OCException& e)
465 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
467 return OC_STACK_DELETE_TRANSACTION;
470 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
471 const QueryParamsMap& queryParams)
473 if(uri.back() == '/')
475 uri.resize(uri.size()-1);
478 ostringstream paramsList;
479 if(queryParams.size() > 0)
484 for(auto& param : queryParams)
486 paramsList << param.first <<'='<<param.second<<';';
489 std::string queryString = paramsList.str();
490 if(queryString.back() == ';')
492 queryString.resize(queryString.size() - 1);
495 std::string ret = uri + queryString;
499 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
501 MessageContainer ocInfo;
502 ocInfo.addRepresentation(rep);
503 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
506 OCStackResult InProcClientWrapper::PostResourceRepresentation(
507 const OCDevAddr& devAddr, bool useHostString,
508 const std::string& uri,
509 const OCRepresentation& rep,
510 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
511 PostCallback& callback, QualityOfService QoS)
515 return OC_STACK_INVALID_PARAM;
517 OCStackResult result;
518 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
519 OCCallbackData cbdata(
520 static_cast<void*>(ctx),
522 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
525 std::string url = assembleSetResourceUri(uri, queryParams);
527 auto cLock = m_csdkLock.lock();
531 std::lock_guard<std::recursive_mutex> lock(*cLock);
532 OCHeaderOption options[MAX_HEADER_OPTIONS];
534 result = OCDoResource(nullptr, OC_REST_POST,
535 url.c_str(), &devAddr,
536 assembleSetResourcePayload(rep).c_str(),
538 static_cast<OCQualityOfService>(QoS),
540 assembleHeaderOptions(options, headerOptions),
541 headerOptions.size());
546 result = OC_STACK_ERROR;
552 OCStackResult InProcClientWrapper::PutResourceRepresentation(
553 const OCDevAddr& devAddr, bool useHostString,
554 const std::string& uri,
555 const OCRepresentation& rep,
556 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
557 PutCallback& callback, QualityOfService QoS)
561 return OC_STACK_INVALID_PARAM;
563 OCStackResult result;
564 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
565 OCCallbackData cbdata(
566 static_cast<void*>(ctx),
568 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
571 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
573 auto cLock = m_csdkLock.lock();
577 std::lock_guard<std::recursive_mutex> lock(*cLock);
579 OCHeaderOption options[MAX_HEADER_OPTIONS];
581 result = OCDoResource(&handle, OC_REST_PUT,
582 url.c_str(), &devAddr,
583 assembleSetResourcePayload(rep).c_str(),
585 static_cast<OCQualityOfService>(QoS),
587 assembleHeaderOptions(options, headerOptions),
588 headerOptions.size());
593 result = OC_STACK_ERROR;
599 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
600 OCClientResponse* clientResponse)
602 ClientCallbackContext::DeleteContext* context =
603 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
604 HeaderOptions serverHeaderOptions;
606 if(clientResponse->result == OC_STACK_OK)
608 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
610 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
612 return OC_STACK_DELETE_TRANSACTION;
615 OCStackResult InProcClientWrapper::DeleteResource(
616 const OCDevAddr& devAddr, bool useHostString,
617 const std::string& uri,
618 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
622 return OC_STACK_INVALID_PARAM;
624 OCStackResult result;
625 ClientCallbackContext::DeleteContext* ctx =
626 new ClientCallbackContext::DeleteContext(callback);
627 OCCallbackData cbdata(
628 static_cast<void*>(ctx),
629 deleteResourceCallback,
630 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
633 auto cLock = m_csdkLock.lock();
637 OCHeaderOption options[MAX_HEADER_OPTIONS];
639 std::lock_guard<std::recursive_mutex> lock(*cLock);
641 result = OCDoResource(nullptr, OC_REST_DELETE,
642 uri.c_str(), &devAddr,
645 static_cast<OCQualityOfService>(m_cfg.QoS),
647 assembleHeaderOptions(options, headerOptions),
648 headerOptions.size());
653 result = OC_STACK_ERROR;
659 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
660 OCClientResponse* clientResponse)
662 ClientCallbackContext::ObserveContext* context =
663 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
664 OCRepresentation attrs;
665 HeaderOptions serverHeaderOptions;
666 uint32_t sequenceNumber = clientResponse->sequenceNumber;
667 OCStackResult result = clientResponse->result;
668 if(clientResponse->result == OC_STACK_OK)
670 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
673 attrs = parseGetSetCallback(clientResponse);
675 catch(OC::OCException& e)
680 std::thread exec(context->callback, serverHeaderOptions, attrs,
681 result, sequenceNumber);
683 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
685 return OC_STACK_DELETE_TRANSACTION;
687 return OC_STACK_KEEP_TRANSACTION;
690 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
691 const OCDevAddr& devAddr, bool useHostString,
692 const std::string& uri,
693 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
694 ObserveCallback& callback, QualityOfService QoS)
698 return OC_STACK_INVALID_PARAM;
700 OCStackResult result;
702 ClientCallbackContext::ObserveContext* ctx =
703 new ClientCallbackContext::ObserveContext(callback);
704 OCCallbackData cbdata(
705 static_cast<void*>(ctx),
706 observeResourceCallback,
707 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
711 if (observeType == ObserveType::Observe)
713 method = OC_REST_OBSERVE;
715 else if (observeType == ObserveType::ObserveAll)
717 method = OC_REST_OBSERVE_ALL;
721 method = OC_REST_OBSERVE_ALL;
724 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
726 auto cLock = m_csdkLock.lock();
730 std::lock_guard<std::recursive_mutex> lock(*cLock);
731 OCHeaderOption options[MAX_HEADER_OPTIONS];
733 result = OCDoResource(handle, method,
734 url.c_str(), &devAddr,
737 static_cast<OCQualityOfService>(QoS),
739 assembleHeaderOptions(options, headerOptions),
740 headerOptions.size());
745 return OC_STACK_ERROR;
751 OCStackResult InProcClientWrapper::CancelObserveResource(
753 const std::string& host, // unused
754 const std::string& uri, // unused
755 const HeaderOptions& headerOptions,
756 QualityOfService QoS)
758 OCStackResult result;
759 auto cLock = m_csdkLock.lock();
763 std::lock_guard<std::recursive_mutex> lock(*cLock);
764 OCHeaderOption options[MAX_HEADER_OPTIONS];
766 result = OCCancel(handle,
767 static_cast<OCQualityOfService>(QoS),
768 assembleHeaderOptions(options, headerOptions),
769 headerOptions.size());
773 result = OC_STACK_ERROR;
779 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
780 OCClientResponse* clientResponse)
782 ClientCallbackContext::SubscribePresenceContext* context =
783 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
786 * This a hack while we rethink presence subscription.
788 std::string url = clientResponse->devAddr.addr;
790 std::thread exec(context->callback, clientResponse->result,
791 clientResponse->sequenceNumber, url);
795 return OC_STACK_KEEP_TRANSACTION;
798 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
799 const std::string& host, const std::string& resourceType,
800 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
804 return OC_STACK_INVALID_PARAM;
807 ClientCallbackContext::SubscribePresenceContext* ctx =
808 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
809 OCCallbackData cbdata(
810 static_cast<void*>(ctx),
811 subscribePresenceCallback,
813 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
816 auto cLock = m_csdkLock.lock();
818 std::ostringstream os;
819 os << host << "/oc/presence";
821 if(!resourceType.empty())
823 os << "?rt=" << resourceType;
829 return OC_STACK_ERROR;
832 return OCDoResource(handle, OC_REST_PRESENCE,
833 os.str().c_str(), nullptr,
834 nullptr, connectivityType,
835 OC_LOW_QOS, &cbdata, NULL, 0);
838 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
840 OCStackResult result;
841 auto cLock = m_csdkLock.lock();
845 std::lock_guard<std::recursive_mutex> lock(*cLock);
846 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
850 result = OC_STACK_ERROR;
856 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
862 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
863 const HeaderOptions& headerOptions)
867 if( headerOptions.size() == 0)
872 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
874 options[i] = OCHeaderOption(OC_COAP_ID,
876 it->getOptionData().length() + 1,
877 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));