1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCTransportFlags serverFlags =
42 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
43 OCTransportFlags clientFlags =
44 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
45 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
47 if(OC_STACK_OK != result)
49 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
53 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
57 InProcClientWrapper::~InProcClientWrapper()
59 if(m_threadRun && m_listeningThread.joinable())
62 m_listeningThread.join();
65 // only stop if we are the ones who actually called 'init'. We are counting
66 // on the server to do the stop.
67 if(m_cfg.mode == ModeType::Client)
73 void InProcClientWrapper::listeningFunc()
78 auto cLock = m_csdkLock.lock();
81 std::lock_guard<std::recursive_mutex> lock(*cLock);
86 result = OC_STACK_ERROR;
89 if(result != OC_STACK_OK)
91 // TODO: do something with result if failed?
94 // To minimize CPU utilization we may wish to do this with sleep
95 std::this_thread::sleep_for(std::chrono::milliseconds(10));
99 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
101 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
103 return OCRepresentation();
109 oc.setJSONRepresentation(clientResponse->resJSONPayload);
111 catch (cereal::RapidJSONException& ex)
113 oclog() <<"RapidJSON Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
114 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
115 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
117 catch (cereal::Exception& ex)
119 oclog() <<"Cereal Exception in parseGetSetCallback: "<<ex.what() <<std::endl<<
120 "Data was:"<< clientResponse->resJSONPayload<< ":" << std::flush;
121 throw OCException(OC::Exception::INVALID_REPRESENTATION, OC_STACK_INVALID_JSON);
124 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
125 if(it == oc.representations().end())
127 return OCRepresentation();
130 // first one is considered the root, everything else is considered a child of this one.
131 OCRepresentation root = *it;
134 std::for_each(it, oc.representations().end(),
135 [&root](const OCRepresentation& repItr)
136 {root.addChild(repItr);});
141 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
142 OCClientResponse* clientResponse)
144 ClientCallbackContext::ListenContext* context =
145 static_cast<ClientCallbackContext::ListenContext*>(ctx);
147 if(clientResponse->result != OC_STACK_OK)
149 oclog() << "listenCallback(): failed to create resource. clientResponse: "
150 << clientResponse->result
153 return OC_STACK_KEEP_TRANSACTION;
156 auto clientWrapper = context->clientWrapper.lock();
160 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
162 return OC_STACK_KEEP_TRANSACTION;
165 std::stringstream requestStream;
166 requestStream << clientResponse->resJSONPayload;
171 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
173 // loop to ensure valid construction of all resources
174 for(auto resource : container.Resources())
176 std::thread exec(context->callback, resource);
181 catch(const std::exception& e)
183 oclog() << "listenCallback failed to parse a malformed message: "
186 << clientResponse->resJSONPayload
188 << clientResponse->result
190 return OC_STACK_KEEP_TRANSACTION;
193 return OC_STACK_KEEP_TRANSACTION;
196 OCStackResult InProcClientWrapper::ListenForResource(
197 const std::string& serviceUrl, // unused
198 const std::string& resourceType,
199 OCConnectivityType connectivityType,
200 FindCallback& callback, QualityOfService QoS)
204 return OC_STACK_INVALID_PARAM;
207 OCStackResult result;
209 ClientCallbackContext::ListenContext* context =
210 new ClientCallbackContext::ListenContext(callback, shared_from_this());
211 OCCallbackData cbdata(
212 static_cast<void*>(context),
214 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
217 auto cLock = m_csdkLock.lock();
220 std::lock_guard<std::recursive_mutex> lock(*cLock);
221 result = OCDoResource(nullptr, OC_REST_DISCOVER,
222 resourceType.c_str(),
223 nullptr, nullptr, connectivityType,
224 static_cast<OCQualityOfService>(QoS),
231 result = OC_STACK_ERROR;
236 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
237 OCClientResponse* clientResponse)
239 ClientCallbackContext::DeviceListenContext* context =
240 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
244 OCRepresentation rep = parseGetSetCallback(clientResponse);
245 std::thread exec(context->callback, rep);
248 catch(OC::OCException& e)
250 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
251 <<e.what() <<std::flush;
254 return OC_STACK_KEEP_TRANSACTION;
257 OCStackResult InProcClientWrapper::ListenForDevice(
258 const std::string& serviceUrl, // unused
259 const std::string& deviceURI,
260 OCConnectivityType connectivityType,
261 FindDeviceCallback& callback,
262 QualityOfService QoS)
266 return OC_STACK_INVALID_PARAM;
268 OCStackResult result;
270 ClientCallbackContext::DeviceListenContext* context =
271 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
272 OCCallbackData cbdata(
273 static_cast<void*>(context),
274 listenDeviceCallback,
275 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
278 auto cLock = m_csdkLock.lock();
281 std::lock_guard<std::recursive_mutex> lock(*cLock);
282 result = OCDoResource(nullptr, OC_REST_DISCOVER,
284 nullptr, nullptr, connectivityType,
285 static_cast<OCQualityOfService>(QoS),
292 result = OC_STACK_ERROR;
297 void parseServerHeaderOptions(OCClientResponse* clientResponse,
298 HeaderOptions& serverHeaderOptions)
302 // Parse header options from server
304 std::string optionData;
306 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
308 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
309 optionData = reinterpret_cast<const char*>
310 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
311 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
312 serverHeaderOptions.push_back(headerOption);
317 // clientResponse is invalid
318 // TODO check proper logging
319 std::cout << " Invalid response " << std::endl;
323 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
324 OCClientResponse* clientResponse)
326 ClientCallbackContext::GetContext* context =
327 static_cast<ClientCallbackContext::GetContext*>(ctx);
329 OCRepresentation rep;
330 HeaderOptions serverHeaderOptions;
331 OCStackResult result = clientResponse->result;
332 if(result == OC_STACK_OK)
334 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
337 rep = parseGetSetCallback(clientResponse);
339 catch(OC::OCException& e)
345 std::thread exec(context->callback, serverHeaderOptions, rep, result);
347 return OC_STACK_DELETE_TRANSACTION;
350 OCStackResult InProcClientWrapper::GetResourceRepresentation(
351 const OCDevAddr& devAddr,
352 const std::string& resourceUri,
353 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
354 GetCallback& callback, QualityOfService QoS)
358 return OC_STACK_INVALID_PARAM;
360 OCStackResult result;
361 ClientCallbackContext::GetContext* ctx =
362 new ClientCallbackContext::GetContext(callback);
363 OCCallbackData cbdata(
364 static_cast<void*>(ctx),
366 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
369 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
371 auto cLock = m_csdkLock.lock();
375 std::lock_guard<std::recursive_mutex> lock(*cLock);
376 OCHeaderOption options[MAX_HEADER_OPTIONS];
378 result = OCDoResource(nullptr, OC_REST_GET,
382 static_cast<OCQualityOfService>(QoS),
384 assembleHeaderOptions(options, headerOptions),
385 headerOptions.size());
390 result = OC_STACK_ERROR;
396 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
397 OCClientResponse* clientResponse)
399 ClientCallbackContext::SetContext* context =
400 static_cast<ClientCallbackContext::SetContext*>(ctx);
401 OCRepresentation attrs;
402 HeaderOptions serverHeaderOptions;
404 OCStackResult result = clientResponse->result;
405 if (OC_STACK_OK == result ||
406 OC_STACK_RESOURCE_CREATED == result ||
407 OC_STACK_RESOURCE_DELETED == result)
409 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
412 attrs = parseGetSetCallback(clientResponse);
414 catch(OC::OCException& e)
420 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
422 return OC_STACK_DELETE_TRANSACTION;
425 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
426 const QueryParamsMap& queryParams)
428 if(uri.back() == '/')
430 uri.resize(uri.size()-1);
433 ostringstream paramsList;
434 if(queryParams.size() > 0)
439 for(auto& param : queryParams)
441 paramsList << param.first <<'='<<param.second<<';';
444 std::string queryString = paramsList.str();
445 if(queryString.back() == ';')
447 queryString.resize(queryString.size() - 1);
450 std::string ret = uri + queryString;
454 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
456 MessageContainer ocInfo;
457 ocInfo.addRepresentation(rep);
458 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
461 OCStackResult InProcClientWrapper::PostResourceRepresentation(
462 const OCDevAddr& devAddr,
463 const std::string& uri,
464 const OCRepresentation& rep,
465 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
466 PostCallback& callback, QualityOfService QoS)
470 return OC_STACK_INVALID_PARAM;
472 OCStackResult result;
473 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
474 OCCallbackData cbdata(
475 static_cast<void*>(ctx),
477 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
480 std::string url = assembleSetResourceUri(uri, queryParams);
482 auto cLock = m_csdkLock.lock();
486 std::lock_guard<std::recursive_mutex> lock(*cLock);
487 OCHeaderOption options[MAX_HEADER_OPTIONS];
489 result = OCDoResource(nullptr, OC_REST_POST,
490 url.c_str(), &devAddr,
491 assembleSetResourcePayload(rep).c_str(),
493 static_cast<OCQualityOfService>(QoS),
495 assembleHeaderOptions(options, headerOptions),
496 headerOptions.size());
501 result = OC_STACK_ERROR;
507 OCStackResult InProcClientWrapper::PutResourceRepresentation(
508 const OCDevAddr& devAddr,
509 const std::string& uri,
510 const OCRepresentation& rep,
511 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
512 PutCallback& callback, QualityOfService QoS)
516 return OC_STACK_INVALID_PARAM;
518 OCStackResult result;
519 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
520 OCCallbackData cbdata(
521 static_cast<void*>(ctx),
523 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
526 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
528 auto cLock = m_csdkLock.lock();
532 std::lock_guard<std::recursive_mutex> lock(*cLock);
534 OCHeaderOption options[MAX_HEADER_OPTIONS];
536 result = OCDoResource(&handle, OC_REST_PUT,
537 url.c_str(), &devAddr,
538 assembleSetResourcePayload(rep).c_str(),
540 static_cast<OCQualityOfService>(QoS),
542 assembleHeaderOptions(options, headerOptions),
543 headerOptions.size());
548 result = OC_STACK_ERROR;
554 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
555 OCClientResponse* clientResponse)
557 ClientCallbackContext::DeleteContext* context =
558 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
559 HeaderOptions serverHeaderOptions;
561 if(clientResponse->result == OC_STACK_OK)
563 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
565 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
567 return OC_STACK_DELETE_TRANSACTION;
570 OCStackResult InProcClientWrapper::DeleteResource(
571 const OCDevAddr& devAddr,
572 const std::string& uri,
573 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
577 return OC_STACK_INVALID_PARAM;
579 OCStackResult result;
580 ClientCallbackContext::DeleteContext* ctx =
581 new ClientCallbackContext::DeleteContext(callback);
582 OCCallbackData cbdata(
583 static_cast<void*>(ctx),
584 deleteResourceCallback,
585 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
588 auto cLock = m_csdkLock.lock();
592 OCHeaderOption options[MAX_HEADER_OPTIONS];
594 std::lock_guard<std::recursive_mutex> lock(*cLock);
596 result = OCDoResource(nullptr, OC_REST_DELETE,
597 uri.c_str(), &devAddr,
600 static_cast<OCQualityOfService>(m_cfg.QoS),
602 assembleHeaderOptions(options, headerOptions),
603 headerOptions.size());
608 result = OC_STACK_ERROR;
614 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
615 OCClientResponse* clientResponse)
617 ClientCallbackContext::ObserveContext* context =
618 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
619 OCRepresentation attrs;
620 HeaderOptions serverHeaderOptions;
621 uint32_t sequenceNumber = clientResponse->sequenceNumber;
622 OCStackResult result = clientResponse->result;
623 if(clientResponse->result == OC_STACK_OK)
625 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
628 attrs = parseGetSetCallback(clientResponse);
630 catch(OC::OCException& e)
635 std::thread exec(context->callback, serverHeaderOptions, attrs,
636 result, sequenceNumber);
638 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
640 return OC_STACK_DELETE_TRANSACTION;
642 return OC_STACK_KEEP_TRANSACTION;
645 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
646 const OCDevAddr& devAddr,
647 const std::string& uri,
648 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
649 ObserveCallback& callback, QualityOfService QoS)
653 return OC_STACK_INVALID_PARAM;
655 OCStackResult result;
657 ClientCallbackContext::ObserveContext* ctx =
658 new ClientCallbackContext::ObserveContext(callback);
659 OCCallbackData cbdata(
660 static_cast<void*>(ctx),
661 observeResourceCallback,
662 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
666 if (observeType == ObserveType::Observe)
668 method = OC_REST_OBSERVE;
670 else if (observeType == ObserveType::ObserveAll)
672 method = OC_REST_OBSERVE_ALL;
676 method = OC_REST_OBSERVE_ALL;
679 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
681 auto cLock = m_csdkLock.lock();
685 std::lock_guard<std::recursive_mutex> lock(*cLock);
686 OCHeaderOption options[MAX_HEADER_OPTIONS];
688 result = OCDoResource(handle, method,
689 url.c_str(), &devAddr,
692 static_cast<OCQualityOfService>(QoS),
694 assembleHeaderOptions(options, headerOptions),
695 headerOptions.size());
700 return OC_STACK_ERROR;
706 OCStackResult InProcClientWrapper::CancelObserveResource(
708 const std::string& host, // unused
709 const std::string& uri, // unused
710 const HeaderOptions& headerOptions,
711 QualityOfService QoS)
713 OCStackResult result;
714 auto cLock = m_csdkLock.lock();
718 std::lock_guard<std::recursive_mutex> lock(*cLock);
719 OCHeaderOption options[MAX_HEADER_OPTIONS];
721 result = OCCancel(handle,
722 static_cast<OCQualityOfService>(QoS),
723 assembleHeaderOptions(options, headerOptions),
724 headerOptions.size());
728 result = OC_STACK_ERROR;
734 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
735 OCClientResponse* clientResponse)
737 ClientCallbackContext::SubscribePresenceContext* context =
738 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
741 * This a hack while we rethink presence subscription.
743 std::string url = clientResponse->devAddr.addr;
745 std::thread exec(context->callback, clientResponse->result,
746 clientResponse->sequenceNumber, url);
750 return OC_STACK_KEEP_TRANSACTION;
753 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
754 const std::string& host, const std::string& resourceType,
755 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
759 return OC_STACK_INVALID_PARAM;
762 ClientCallbackContext::SubscribePresenceContext* ctx =
763 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
764 OCCallbackData cbdata(
765 static_cast<void*>(ctx),
766 subscribePresenceCallback,
768 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
771 auto cLock = m_csdkLock.lock();
773 std::ostringstream os;
774 os << host << "/oc/presence";
776 if(!resourceType.empty())
778 os << "?rt=" << resourceType;
784 return OC_STACK_ERROR;
787 return OCDoResource(handle, OC_REST_PRESENCE,
788 os.str().c_str(), nullptr,
789 nullptr, connectivityType,
790 OC_LOW_QOS, &cbdata, NULL, 0);
793 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
795 OCStackResult result;
796 auto cLock = m_csdkLock.lock();
800 std::lock_guard<std::recursive_mutex> lock(*cLock);
801 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
805 result = OC_STACK_ERROR;
811 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
817 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
818 const HeaderOptions& headerOptions)
822 if( headerOptions.size() == 0)
827 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
829 options[i] = OCHeaderOption(OC_COAP_ID,
831 it->getOptionData().length() + 1,
832 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));