1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
32 InProcClientWrapper::InProcClientWrapper(
33 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34 : m_threadRun(false), m_csdkLock(csdkLock),
37 // if the config type is server, we ought to never get called. If the config type
38 // is both, we count on the server to run the thread and do the initialize
40 if(m_cfg.mode == ModeType::Client)
42 OCTransportFlags serverFlags =
43 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44 OCTransportFlags clientFlags =
45 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
48 if(OC_STACK_OK != result)
50 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
54 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
58 InProcClientWrapper::~InProcClientWrapper()
60 if(m_threadRun && m_listeningThread.joinable())
63 m_listeningThread.join();
66 // only stop if we are the ones who actually called 'init'. We are counting
67 // on the server to do the stop.
68 if(m_cfg.mode == ModeType::Client)
74 void InProcClientWrapper::listeningFunc()
79 auto cLock = m_csdkLock.lock();
82 std::lock_guard<std::recursive_mutex> lock(*cLock);
87 result = OC_STACK_ERROR;
90 if(result != OC_STACK_OK)
92 // TODO: do something with result if failed?
95 // To minimize CPU utilization we may wish to do this with sleep
96 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
102 if(clientResponse->payload == nullptr ||
104 clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105 clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106 clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
110 //OCPayloadDestroy(clientResponse->payload);
111 return OCRepresentation();
115 oc.setPayload(clientResponse->payload);
116 //OCPayloadDestroy(clientResponse->payload);
118 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119 if(it == oc.representations().end())
121 return OCRepresentation();
124 // first one is considered the root, everything else is considered a child of this one.
125 OCRepresentation root = *it;
128 std::for_each(it, oc.representations().end(),
129 [&root](const OCRepresentation& repItr)
130 {root.addChild(repItr);});
135 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
136 OCClientResponse* clientResponse)
138 ClientCallbackContext::ListenContext* context =
139 static_cast<ClientCallbackContext::ListenContext*>(ctx);
141 if(clientResponse->result != OC_STACK_OK)
143 oclog() << "listenCallback(): failed to create resource. clientResponse: "
144 << clientResponse->result
147 return OC_STACK_KEEP_TRANSACTION;
150 if(!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
152 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
154 return OC_STACK_KEEP_TRANSACTION;
157 auto clientWrapper = context->clientWrapper.lock();
161 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
163 return OC_STACK_KEEP_TRANSACTION;
166 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
167 reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
168 // loop to ensure valid construction of all resources
169 for(auto resource : container.Resources())
171 std::thread exec(context->callback, resource);
176 return OC_STACK_KEEP_TRANSACTION;
179 OCStackResult InProcClientWrapper::ListenForResource(
180 const std::string& serviceUrl,
181 const std::string& resourceType,
182 OCConnectivityType connectivityType,
183 FindCallback& callback, QualityOfService QoS)
187 return OC_STACK_INVALID_PARAM;
190 OCStackResult result;
191 ostringstream resourceUri;
192 resourceUri << serviceUrl << resourceType;
194 ClientCallbackContext::ListenContext* context =
195 new ClientCallbackContext::ListenContext(callback, shared_from_this());
196 OCCallbackData cbdata(
197 static_cast<void*>(context),
199 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
202 auto cLock = m_csdkLock.lock();
205 std::lock_guard<std::recursive_mutex> lock(*cLock);
206 result = OCDoResource(nullptr, OC_REST_DISCOVER,
207 resourceUri.str().c_str(),
208 nullptr, nullptr, connectivityType,
209 static_cast<OCQualityOfService>(QoS),
216 result = OC_STACK_ERROR;
221 OCStackApplicationResult listenDeviceCallback(void* ctx,
222 OCDoHandle /*handle*/,
223 OCClientResponse* clientResponse)
225 ClientCallbackContext::DeviceListenContext* context =
226 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
230 OCRepresentation rep = parseGetSetCallback(clientResponse);
231 std::thread exec(context->callback, rep);
234 catch(OC::OCException& e)
236 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
237 <<e.what() <<std::flush;
240 return OC_STACK_KEEP_TRANSACTION;
243 OCStackResult InProcClientWrapper::ListenForDevice(
244 const std::string& serviceUrl,
245 const std::string& deviceURI,
246 OCConnectivityType connectivityType,
247 FindDeviceCallback& callback,
248 QualityOfService QoS)
252 return OC_STACK_INVALID_PARAM;
254 OCStackResult result;
255 ostringstream deviceUri;
256 deviceUri << serviceUrl << deviceURI;
258 ClientCallbackContext::DeviceListenContext* context =
259 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
260 OCCallbackData cbdata(
261 static_cast<void*>(context),
262 listenDeviceCallback,
263 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
266 auto cLock = m_csdkLock.lock();
269 std::lock_guard<std::recursive_mutex> lock(*cLock);
270 result = OCDoResource(nullptr, OC_REST_DISCOVER,
271 deviceUri.str().c_str(),
272 nullptr, nullptr, connectivityType,
273 static_cast<OCQualityOfService>(QoS),
280 result = OC_STACK_ERROR;
285 void parseServerHeaderOptions(OCClientResponse* clientResponse,
286 HeaderOptions& serverHeaderOptions)
290 // Parse header options from server
292 std::string optionData;
294 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
296 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
297 optionData = reinterpret_cast<const char*>
298 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
299 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
300 serverHeaderOptions.push_back(headerOption);
305 // clientResponse is invalid
306 // TODO check proper logging
307 std::cout << " Invalid response " << std::endl;
311 OCStackApplicationResult getResourceCallback(void* ctx,
312 OCDoHandle /*handle*/,
313 OCClientResponse* clientResponse)
315 ClientCallbackContext::GetContext* context =
316 static_cast<ClientCallbackContext::GetContext*>(ctx);
318 OCRepresentation rep;
319 HeaderOptions serverHeaderOptions;
320 OCStackResult result = clientResponse->result;
321 if(result == OC_STACK_OK)
323 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
326 rep = parseGetSetCallback(clientResponse);
328 catch(OC::OCException& e)
334 std::thread exec(context->callback, serverHeaderOptions, rep, result);
336 return OC_STACK_DELETE_TRANSACTION;
339 OCStackResult InProcClientWrapper::GetResourceRepresentation(
340 const OCDevAddr& devAddr,
341 const std::string& resourceUri,
342 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
343 GetCallback& callback, QualityOfService QoS)
347 return OC_STACK_INVALID_PARAM;
349 OCStackResult result;
350 ClientCallbackContext::GetContext* ctx =
351 new ClientCallbackContext::GetContext(callback);
352 OCCallbackData cbdata(
353 static_cast<void*>(ctx),
355 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
358 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
360 auto cLock = m_csdkLock.lock();
364 std::lock_guard<std::recursive_mutex> lock(*cLock);
365 OCHeaderOption options[MAX_HEADER_OPTIONS];
367 result = OCDoResource(
368 nullptr, OC_REST_GET,
372 static_cast<OCQualityOfService>(QoS),
374 assembleHeaderOptions(options, headerOptions),
375 headerOptions.size());
380 result = OC_STACK_ERROR;
386 OCStackApplicationResult setResourceCallback(void* ctx,
387 OCDoHandle /*handle*/,
388 OCClientResponse* clientResponse)
390 ClientCallbackContext::SetContext* context =
391 static_cast<ClientCallbackContext::SetContext*>(ctx);
392 OCRepresentation attrs;
393 HeaderOptions serverHeaderOptions;
395 OCStackResult result = clientResponse->result;
396 if (OC_STACK_OK == result ||
397 OC_STACK_RESOURCE_CREATED == result ||
398 OC_STACK_RESOURCE_DELETED == result)
400 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
403 attrs = parseGetSetCallback(clientResponse);
405 catch(OC::OCException& e)
411 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
413 return OC_STACK_DELETE_TRANSACTION;
416 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
417 const QueryParamsMap& queryParams)
419 if(uri.back() == '/')
421 uri.resize(uri.size()-1);
424 ostringstream paramsList;
425 if(queryParams.size() > 0)
430 for(auto& param : queryParams)
432 paramsList << param.first <<'='<<param.second<<';';
435 std::string queryString = paramsList.str();
436 if(queryString.back() == ';')
438 queryString.resize(queryString.size() - 1);
441 std::string ret = uri + queryString;
445 OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
447 MessageContainer ocInfo;
448 ocInfo.addRepresentation(rep);
449 return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
452 OCStackResult InProcClientWrapper::PostResourceRepresentation(
453 const OCDevAddr& devAddr,
454 const std::string& uri,
455 const OCRepresentation& rep,
456 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
457 PostCallback& callback, QualityOfService QoS)
461 return OC_STACK_INVALID_PARAM;
463 OCStackResult result;
464 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
465 OCCallbackData cbdata(
466 static_cast<void*>(ctx),
468 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
471 std::string url = assembleSetResourceUri(uri, queryParams);
473 auto cLock = m_csdkLock.lock();
477 std::lock_guard<std::recursive_mutex> lock(*cLock);
478 OCHeaderOption options[MAX_HEADER_OPTIONS];
480 result = OCDoResource(nullptr, OC_REST_POST,
481 url.c_str(), &devAddr,
482 assembleSetResourcePayload(rep),
484 static_cast<OCQualityOfService>(QoS),
486 assembleHeaderOptions(options, headerOptions),
487 headerOptions.size());
492 result = OC_STACK_ERROR;
498 OCStackResult InProcClientWrapper::PutResourceRepresentation(
499 const OCDevAddr& devAddr,
500 const std::string& uri,
501 const OCRepresentation& rep,
502 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
503 PutCallback& callback, QualityOfService QoS)
507 return OC_STACK_INVALID_PARAM;
509 OCStackResult result;
510 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
511 OCCallbackData cbdata(
512 static_cast<void*>(ctx),
514 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
517 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
519 auto cLock = m_csdkLock.lock();
523 std::lock_guard<std::recursive_mutex> lock(*cLock);
525 OCHeaderOption options[MAX_HEADER_OPTIONS];
527 result = OCDoResource(&handle, OC_REST_PUT,
528 url.c_str(), &devAddr,
529 assembleSetResourcePayload(rep),
531 static_cast<OCQualityOfService>(QoS),
533 assembleHeaderOptions(options, headerOptions),
534 headerOptions.size());
539 result = OC_STACK_ERROR;
545 OCStackApplicationResult deleteResourceCallback(void* ctx,
546 OCDoHandle /*handle*/,
547 OCClientResponse* clientResponse)
549 ClientCallbackContext::DeleteContext* context =
550 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
551 HeaderOptions serverHeaderOptions;
553 if(clientResponse->result == OC_STACK_OK)
555 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
557 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
559 return OC_STACK_DELETE_TRANSACTION;
562 OCStackResult InProcClientWrapper::DeleteResource(
563 const OCDevAddr& devAddr,
564 const std::string& uri,
565 const HeaderOptions& headerOptions,
566 DeleteCallback& callback,
567 QualityOfService /*QoS*/)
571 return OC_STACK_INVALID_PARAM;
573 OCStackResult result;
574 ClientCallbackContext::DeleteContext* ctx =
575 new ClientCallbackContext::DeleteContext(callback);
576 OCCallbackData cbdata(
577 static_cast<void*>(ctx),
578 deleteResourceCallback,
579 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
582 auto cLock = m_csdkLock.lock();
586 OCHeaderOption options[MAX_HEADER_OPTIONS];
588 std::lock_guard<std::recursive_mutex> lock(*cLock);
590 result = OCDoResource(nullptr, OC_REST_DELETE,
591 uri.c_str(), &devAddr,
594 static_cast<OCQualityOfService>(m_cfg.QoS),
596 assembleHeaderOptions(options, headerOptions),
597 headerOptions.size());
602 result = OC_STACK_ERROR;
608 OCStackApplicationResult observeResourceCallback(void* ctx,
609 OCDoHandle /*handle*/,
610 OCClientResponse* clientResponse)
612 ClientCallbackContext::ObserveContext* context =
613 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
614 OCRepresentation attrs;
615 HeaderOptions serverHeaderOptions;
616 uint32_t sequenceNumber = clientResponse->sequenceNumber;
617 OCStackResult result = clientResponse->result;
618 if(clientResponse->result == OC_STACK_OK)
620 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
623 attrs = parseGetSetCallback(clientResponse);
625 catch(OC::OCException& e)
630 std::thread exec(context->callback, serverHeaderOptions, attrs,
631 result, sequenceNumber);
633 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
635 return OC_STACK_DELETE_TRANSACTION;
637 return OC_STACK_KEEP_TRANSACTION;
640 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
641 const OCDevAddr& devAddr,
642 const std::string& uri,
643 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
644 ObserveCallback& callback, QualityOfService QoS)
648 return OC_STACK_INVALID_PARAM;
650 OCStackResult result;
652 ClientCallbackContext::ObserveContext* ctx =
653 new ClientCallbackContext::ObserveContext(callback);
654 OCCallbackData cbdata(
655 static_cast<void*>(ctx),
656 observeResourceCallback,
657 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
661 if (observeType == ObserveType::Observe)
663 method = OC_REST_OBSERVE;
665 else if (observeType == ObserveType::ObserveAll)
667 method = OC_REST_OBSERVE_ALL;
671 method = OC_REST_OBSERVE_ALL;
674 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
676 auto cLock = m_csdkLock.lock();
680 std::lock_guard<std::recursive_mutex> lock(*cLock);
681 OCHeaderOption options[MAX_HEADER_OPTIONS];
683 result = OCDoResource(handle, method,
684 url.c_str(), &devAddr,
687 static_cast<OCQualityOfService>(QoS),
689 assembleHeaderOptions(options, headerOptions),
690 headerOptions.size());
695 return OC_STACK_ERROR;
701 OCStackResult InProcClientWrapper::CancelObserveResource(
703 const std::string& /*host*/,
704 const std::string& /*uri*/,
705 const HeaderOptions& headerOptions,
706 QualityOfService QoS)
708 OCStackResult result;
709 auto cLock = m_csdkLock.lock();
713 std::lock_guard<std::recursive_mutex> lock(*cLock);
714 OCHeaderOption options[MAX_HEADER_OPTIONS];
716 result = OCCancel(handle,
717 static_cast<OCQualityOfService>(QoS),
718 assembleHeaderOptions(options, headerOptions),
719 headerOptions.size());
723 result = OC_STACK_ERROR;
729 OCStackApplicationResult subscribePresenceCallback(void* ctx,
730 OCDoHandle /*handle*/,
731 OCClientResponse* clientResponse)
733 ClientCallbackContext::SubscribePresenceContext* context =
734 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
737 * This a hack while we rethink presence subscription.
739 std::string url = clientResponse->devAddr.addr;
741 std::thread exec(context->callback, clientResponse->result,
742 clientResponse->sequenceNumber, url);
746 return OC_STACK_KEEP_TRANSACTION;
749 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
750 const std::string& host, const std::string& resourceType,
751 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
755 return OC_STACK_INVALID_PARAM;
758 ClientCallbackContext::SubscribePresenceContext* ctx =
759 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
760 OCCallbackData cbdata(
761 static_cast<void*>(ctx),
762 subscribePresenceCallback,
764 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
767 auto cLock = m_csdkLock.lock();
769 std::ostringstream os;
770 os << host << OC_RSRVD_PRESENCE_URI;
772 if(!resourceType.empty())
774 os << "?rt=" << resourceType;
780 return OC_STACK_ERROR;
783 return OCDoResource(handle, OC_REST_PRESENCE,
784 os.str().c_str(), nullptr,
785 nullptr, connectivityType,
786 OC_LOW_QOS, &cbdata, NULL, 0);
789 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
791 OCStackResult result;
792 auto cLock = m_csdkLock.lock();
796 std::lock_guard<std::recursive_mutex> lock(*cLock);
797 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
801 result = OC_STACK_ERROR;
807 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
813 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
814 const HeaderOptions& headerOptions)
818 if( headerOptions.size() == 0)
823 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
825 options[i] = OCHeaderOption(OC_COAP_ID,
827 it->getOptionData().length() + 1,
828 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));