1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
32 InProcClientWrapper::InProcClientWrapper(
33 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34 : m_threadRun(false), m_csdkLock(csdkLock),
37 // if the config type is server, we ought to never get called. If the config type
38 // is both, we count on the server to run the thread and do the initialize
40 if(m_cfg.mode == ModeType::Client)
42 OCTransportFlags serverFlags =
43 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44 OCTransportFlags clientFlags =
45 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
48 if(OC_STACK_OK != result)
50 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
54 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
58 InProcClientWrapper::~InProcClientWrapper()
60 if(m_threadRun && m_listeningThread.joinable())
63 m_listeningThread.join();
66 // only stop if we are the ones who actually called 'init'. We are counting
67 // on the server to do the stop.
68 if(m_cfg.mode == ModeType::Client)
74 void InProcClientWrapper::listeningFunc()
79 auto cLock = m_csdkLock.lock();
82 std::lock_guard<std::recursive_mutex> lock(*cLock);
87 result = OC_STACK_ERROR;
90 if(result != OC_STACK_OK)
92 // TODO: do something with result if failed?
95 // To minimize CPU utilization we may wish to do this with sleep
96 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
102 if(clientResponse->payload == nullptr ||
104 clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105 clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106 clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
110 //OCPayloadDestroy(clientResponse->payload);
111 return OCRepresentation();
115 oc.setPayload(clientResponse->payload);
116 //OCPayloadDestroy(clientResponse->payload);
118 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119 if(it == oc.representations().end())
121 return OCRepresentation();
124 // first one is considered the root, everything else is considered a child of this one.
125 OCRepresentation root = *it;
126 root.setDevAddr(clientResponse->devAddr);
127 root.setUri(clientResponse->resourceUri);
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 if(!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
154 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
156 return OC_STACK_KEEP_TRANSACTION;
159 auto clientWrapper = context->clientWrapper.lock();
163 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
165 return OC_STACK_KEEP_TRANSACTION;
168 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
169 reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
170 // loop to ensure valid construction of all resources
171 for(auto resource : container.Resources())
173 std::thread exec(context->callback, resource);
178 return OC_STACK_KEEP_TRANSACTION;
181 OCStackResult InProcClientWrapper::ListenForResource(
182 const std::string& serviceUrl,
183 const std::string& resourceType,
184 OCConnectivityType connectivityType,
185 FindCallback& callback, QualityOfService QoS)
189 return OC_STACK_INVALID_PARAM;
192 OCStackResult result;
193 ostringstream resourceUri;
194 resourceUri << serviceUrl << resourceType;
196 ClientCallbackContext::ListenContext* context =
197 new ClientCallbackContext::ListenContext(callback, shared_from_this());
198 OCCallbackData cbdata(
199 static_cast<void*>(context),
201 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
204 auto cLock = m_csdkLock.lock();
207 std::lock_guard<std::recursive_mutex> lock(*cLock);
208 result = OCDoResource(nullptr, OC_REST_DISCOVER,
209 resourceUri.str().c_str(),
210 nullptr, nullptr, connectivityType,
211 static_cast<OCQualityOfService>(QoS),
218 result = OC_STACK_ERROR;
223 OCStackApplicationResult listenDeviceCallback(void* ctx,
224 OCDoHandle /*handle*/,
225 OCClientResponse* clientResponse)
227 ClientCallbackContext::DeviceListenContext* context =
228 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
232 OCRepresentation rep = parseGetSetCallback(clientResponse);
233 std::thread exec(context->callback, rep);
236 catch(OC::OCException& e)
238 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
239 <<e.what() <<std::flush;
242 return OC_STACK_KEEP_TRANSACTION;
245 OCStackResult InProcClientWrapper::ListenForDevice(
246 const std::string& serviceUrl,
247 const std::string& deviceURI,
248 OCConnectivityType connectivityType,
249 FindDeviceCallback& callback,
250 QualityOfService QoS)
254 return OC_STACK_INVALID_PARAM;
256 OCStackResult result;
257 ostringstream deviceUri;
258 deviceUri << serviceUrl << deviceURI;
260 ClientCallbackContext::DeviceListenContext* context =
261 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
262 OCCallbackData cbdata(
263 static_cast<void*>(context),
264 listenDeviceCallback,
265 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
268 auto cLock = m_csdkLock.lock();
271 std::lock_guard<std::recursive_mutex> lock(*cLock);
272 result = OCDoResource(nullptr, OC_REST_DISCOVER,
273 deviceUri.str().c_str(),
274 nullptr, nullptr, connectivityType,
275 static_cast<OCQualityOfService>(QoS),
282 result = OC_STACK_ERROR;
287 void parseServerHeaderOptions(OCClientResponse* clientResponse,
288 HeaderOptions& serverHeaderOptions)
292 // Parse header options from server
294 std::string optionData;
296 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
298 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
299 optionData = reinterpret_cast<const char*>
300 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
301 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
302 serverHeaderOptions.push_back(headerOption);
307 // clientResponse is invalid
308 // TODO check proper logging
309 std::cout << " Invalid response " << std::endl;
313 OCStackApplicationResult getResourceCallback(void* ctx,
314 OCDoHandle /*handle*/,
315 OCClientResponse* clientResponse)
317 ClientCallbackContext::GetContext* context =
318 static_cast<ClientCallbackContext::GetContext*>(ctx);
320 OCRepresentation rep;
321 HeaderOptions serverHeaderOptions;
322 OCStackResult result = clientResponse->result;
323 if(result == OC_STACK_OK)
325 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
328 rep = parseGetSetCallback(clientResponse);
330 catch(OC::OCException& e)
336 std::thread exec(context->callback, serverHeaderOptions, rep, result);
338 return OC_STACK_DELETE_TRANSACTION;
341 OCStackResult InProcClientWrapper::GetResourceRepresentation(
342 const OCDevAddr& devAddr,
343 const std::string& resourceUri,
344 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
345 GetCallback& callback, QualityOfService QoS)
349 return OC_STACK_INVALID_PARAM;
351 OCStackResult result;
352 ClientCallbackContext::GetContext* ctx =
353 new ClientCallbackContext::GetContext(callback);
354 OCCallbackData cbdata(
355 static_cast<void*>(ctx),
357 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
360 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
362 auto cLock = m_csdkLock.lock();
366 std::lock_guard<std::recursive_mutex> lock(*cLock);
367 OCHeaderOption options[MAX_HEADER_OPTIONS];
369 result = OCDoResource(
370 nullptr, OC_REST_GET,
374 static_cast<OCQualityOfService>(QoS),
376 assembleHeaderOptions(options, headerOptions),
377 headerOptions.size());
382 result = OC_STACK_ERROR;
388 OCStackApplicationResult setResourceCallback(void* ctx,
389 OCDoHandle /*handle*/,
390 OCClientResponse* clientResponse)
392 ClientCallbackContext::SetContext* context =
393 static_cast<ClientCallbackContext::SetContext*>(ctx);
394 OCRepresentation attrs;
395 HeaderOptions serverHeaderOptions;
397 OCStackResult result = clientResponse->result;
398 if (OC_STACK_OK == result ||
399 OC_STACK_RESOURCE_CREATED == result ||
400 OC_STACK_RESOURCE_DELETED == result)
402 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
405 attrs = parseGetSetCallback(clientResponse);
407 catch(OC::OCException& e)
413 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
415 return OC_STACK_DELETE_TRANSACTION;
418 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
419 const QueryParamsMap& queryParams)
421 if(uri.back() == '/')
423 uri.resize(uri.size()-1);
426 ostringstream paramsList;
427 if(queryParams.size() > 0)
432 for(auto& param : queryParams)
434 paramsList << param.first <<'='<<param.second<<';';
437 std::string queryString = paramsList.str();
438 if(queryString.back() == ';')
440 queryString.resize(queryString.size() - 1);
443 std::string ret = uri + queryString;
447 OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
449 MessageContainer ocInfo;
450 ocInfo.addRepresentation(rep);
451 return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
454 OCStackResult InProcClientWrapper::PostResourceRepresentation(
455 const OCDevAddr& devAddr,
456 const std::string& uri,
457 const OCRepresentation& rep,
458 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
459 PostCallback& callback, QualityOfService QoS)
463 return OC_STACK_INVALID_PARAM;
465 OCStackResult result;
466 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
467 OCCallbackData cbdata(
468 static_cast<void*>(ctx),
470 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
473 std::string url = assembleSetResourceUri(uri, queryParams);
475 auto cLock = m_csdkLock.lock();
479 std::lock_guard<std::recursive_mutex> lock(*cLock);
480 OCHeaderOption options[MAX_HEADER_OPTIONS];
482 result = OCDoResource(nullptr, OC_REST_POST,
483 url.c_str(), &devAddr,
484 assembleSetResourcePayload(rep),
486 static_cast<OCQualityOfService>(QoS),
488 assembleHeaderOptions(options, headerOptions),
489 headerOptions.size());
494 result = OC_STACK_ERROR;
500 OCStackResult InProcClientWrapper::PutResourceRepresentation(
501 const OCDevAddr& devAddr,
502 const std::string& uri,
503 const OCRepresentation& rep,
504 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
505 PutCallback& callback, QualityOfService QoS)
509 return OC_STACK_INVALID_PARAM;
511 OCStackResult result;
512 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
513 OCCallbackData cbdata(
514 static_cast<void*>(ctx),
516 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
519 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
521 auto cLock = m_csdkLock.lock();
525 std::lock_guard<std::recursive_mutex> lock(*cLock);
527 OCHeaderOption options[MAX_HEADER_OPTIONS];
529 result = OCDoResource(&handle, OC_REST_PUT,
530 url.c_str(), &devAddr,
531 assembleSetResourcePayload(rep),
533 static_cast<OCQualityOfService>(QoS),
535 assembleHeaderOptions(options, headerOptions),
536 headerOptions.size());
541 result = OC_STACK_ERROR;
547 OCStackApplicationResult deleteResourceCallback(void* ctx,
548 OCDoHandle /*handle*/,
549 OCClientResponse* clientResponse)
551 ClientCallbackContext::DeleteContext* context =
552 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
553 HeaderOptions serverHeaderOptions;
555 if(clientResponse->result == OC_STACK_OK)
557 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
559 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
561 return OC_STACK_DELETE_TRANSACTION;
564 OCStackResult InProcClientWrapper::DeleteResource(
565 const OCDevAddr& devAddr,
566 const std::string& uri,
567 const HeaderOptions& headerOptions,
568 DeleteCallback& callback,
569 QualityOfService /*QoS*/)
573 return OC_STACK_INVALID_PARAM;
575 OCStackResult result;
576 ClientCallbackContext::DeleteContext* ctx =
577 new ClientCallbackContext::DeleteContext(callback);
578 OCCallbackData cbdata(
579 static_cast<void*>(ctx),
580 deleteResourceCallback,
581 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
584 auto cLock = m_csdkLock.lock();
588 OCHeaderOption options[MAX_HEADER_OPTIONS];
590 std::lock_guard<std::recursive_mutex> lock(*cLock);
592 result = OCDoResource(nullptr, OC_REST_DELETE,
593 uri.c_str(), &devAddr,
596 static_cast<OCQualityOfService>(m_cfg.QoS),
598 assembleHeaderOptions(options, headerOptions),
599 headerOptions.size());
604 result = OC_STACK_ERROR;
610 OCStackApplicationResult observeResourceCallback(void* ctx,
611 OCDoHandle /*handle*/,
612 OCClientResponse* clientResponse)
614 ClientCallbackContext::ObserveContext* context =
615 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
616 OCRepresentation attrs;
617 HeaderOptions serverHeaderOptions;
618 uint32_t sequenceNumber = clientResponse->sequenceNumber;
619 OCStackResult result = clientResponse->result;
620 if(clientResponse->result == OC_STACK_OK)
622 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
625 attrs = parseGetSetCallback(clientResponse);
627 catch(OC::OCException& e)
632 std::thread exec(context->callback, serverHeaderOptions, attrs,
633 result, sequenceNumber);
635 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
637 return OC_STACK_DELETE_TRANSACTION;
639 return OC_STACK_KEEP_TRANSACTION;
642 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
643 const OCDevAddr& devAddr,
644 const std::string& uri,
645 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
646 ObserveCallback& callback, QualityOfService QoS)
650 return OC_STACK_INVALID_PARAM;
652 OCStackResult result;
654 ClientCallbackContext::ObserveContext* ctx =
655 new ClientCallbackContext::ObserveContext(callback);
656 OCCallbackData cbdata(
657 static_cast<void*>(ctx),
658 observeResourceCallback,
659 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
663 if (observeType == ObserveType::Observe)
665 method = OC_REST_OBSERVE;
667 else if (observeType == ObserveType::ObserveAll)
669 method = OC_REST_OBSERVE_ALL;
673 method = OC_REST_OBSERVE_ALL;
676 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
678 auto cLock = m_csdkLock.lock();
682 std::lock_guard<std::recursive_mutex> lock(*cLock);
683 OCHeaderOption options[MAX_HEADER_OPTIONS];
685 result = OCDoResource(handle, method,
686 url.c_str(), &devAddr,
689 static_cast<OCQualityOfService>(QoS),
691 assembleHeaderOptions(options, headerOptions),
692 headerOptions.size());
697 return OC_STACK_ERROR;
703 OCStackResult InProcClientWrapper::CancelObserveResource(
705 const std::string& /*host*/,
706 const std::string& /*uri*/,
707 const HeaderOptions& headerOptions,
708 QualityOfService QoS)
710 OCStackResult result;
711 auto cLock = m_csdkLock.lock();
715 std::lock_guard<std::recursive_mutex> lock(*cLock);
716 OCHeaderOption options[MAX_HEADER_OPTIONS];
718 result = OCCancel(handle,
719 static_cast<OCQualityOfService>(QoS),
720 assembleHeaderOptions(options, headerOptions),
721 headerOptions.size());
725 result = OC_STACK_ERROR;
731 OCStackApplicationResult subscribePresenceCallback(void* ctx,
732 OCDoHandle /*handle*/,
733 OCClientResponse* clientResponse)
735 ClientCallbackContext::SubscribePresenceContext* context =
736 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
739 * This a hack while we rethink presence subscription.
741 std::string url = clientResponse->devAddr.addr;
743 std::thread exec(context->callback, clientResponse->result,
744 clientResponse->sequenceNumber, url);
748 return OC_STACK_KEEP_TRANSACTION;
751 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
752 const std::string& host, const std::string& resourceType,
753 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
757 return OC_STACK_INVALID_PARAM;
760 ClientCallbackContext::SubscribePresenceContext* ctx =
761 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
762 OCCallbackData cbdata(
763 static_cast<void*>(ctx),
764 subscribePresenceCallback,
766 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
769 auto cLock = m_csdkLock.lock();
771 std::ostringstream os;
772 os << host << OC_RSRVD_PRESENCE_URI;
774 if(!resourceType.empty())
776 os << "?rt=" << resourceType;
782 return OC_STACK_ERROR;
785 return OCDoResource(handle, OC_REST_PRESENCE,
786 os.str().c_str(), nullptr,
787 nullptr, connectivityType,
788 OC_LOW_QOS, &cbdata, NULL, 0);
791 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
793 OCStackResult result;
794 auto cLock = m_csdkLock.lock();
798 std::lock_guard<std::recursive_mutex> lock(*cLock);
799 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
803 result = OC_STACK_ERROR;
809 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
815 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
816 const HeaderOptions& headerOptions)
820 if( headerOptions.size() == 0)
825 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
827 options[i] = OCHeaderOption(OC_COAP_ID,
829 it->getOptionData().length() + 1,
830 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));