1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
32 InProcClientWrapper::InProcClientWrapper(
33 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34 : m_threadRun(false), m_csdkLock(csdkLock),
37 // if the config type is server, we ought to never get called. If the config type
38 // is both, we count on the server to run the thread and do the initialize
40 if(m_cfg.mode == ModeType::Client)
42 OCTransportFlags serverFlags =
43 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44 OCTransportFlags clientFlags =
45 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
48 if(OC_STACK_OK != result)
50 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
54 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
58 InProcClientWrapper::~InProcClientWrapper()
60 if(m_threadRun && m_listeningThread.joinable())
63 m_listeningThread.join();
66 // only stop if we are the ones who actually called 'init'. We are counting
67 // on the server to do the stop.
68 if(m_cfg.mode == ModeType::Client)
74 void InProcClientWrapper::listeningFunc()
79 auto cLock = m_csdkLock.lock();
82 std::lock_guard<std::recursive_mutex> lock(*cLock);
87 result = OC_STACK_ERROR;
90 if(result != OC_STACK_OK)
92 // TODO: do something with result if failed?
95 // To minimize CPU utilization we may wish to do this with sleep
96 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
102 if(clientResponse->payload == nullptr ||
104 clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105 clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106 clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
110 //OCPayloadDestroy(clientResponse->payload);
111 return OCRepresentation();
115 oc.setPayload(clientResponse->payload);
116 //OCPayloadDestroy(clientResponse->payload);
118 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119 if(it == oc.representations().end())
121 return OCRepresentation();
124 // first one is considered the root, everything else is considered a child of this one.
125 OCRepresentation root = *it;
128 std::for_each(it, oc.representations().end(),
129 [&root](const OCRepresentation& repItr)
130 {root.addChild(repItr);});
135 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
136 OCClientResponse* clientResponse)
138 ClientCallbackContext::ListenContext* context =
139 static_cast<ClientCallbackContext::ListenContext*>(ctx);
141 if(clientResponse->result != OC_STACK_OK)
143 oclog() << "listenCallback(): failed to create resource. clientResponse: "
144 << clientResponse->result
147 return OC_STACK_KEEP_TRANSACTION;
150 if(!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
152 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
154 return OC_STACK_KEEP_TRANSACTION;
157 auto clientWrapper = context->clientWrapper.lock();
161 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
163 return OC_STACK_KEEP_TRANSACTION;
166 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
167 reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
168 // loop to ensure valid construction of all resources
169 for(auto resource : container.Resources())
171 std::thread exec(context->callback, resource);
176 return OC_STACK_KEEP_TRANSACTION;
179 OCStackResult InProcClientWrapper::ListenForResource(
180 const std::string& serviceUrl, // unused
181 const std::string& resourceType,
182 OCConnectivityType connectivityType,
183 FindCallback& callback, QualityOfService QoS)
187 return OC_STACK_INVALID_PARAM;
190 OCStackResult result;
192 ClientCallbackContext::ListenContext* context =
193 new ClientCallbackContext::ListenContext(callback, shared_from_this());
194 OCCallbackData cbdata(
195 static_cast<void*>(context),
197 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
200 auto cLock = m_csdkLock.lock();
203 std::lock_guard<std::recursive_mutex> lock(*cLock);
204 result = OCDoResource(nullptr, OC_REST_DISCOVER,
205 resourceType.c_str(),
206 nullptr, nullptr, connectivityType,
207 static_cast<OCQualityOfService>(QoS),
214 result = OC_STACK_ERROR;
219 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
220 OCClientResponse* clientResponse)
222 ClientCallbackContext::DeviceListenContext* context =
223 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
227 OCRepresentation rep = parseGetSetCallback(clientResponse);
228 std::thread exec(context->callback, rep);
231 catch(OC::OCException& e)
233 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
234 <<e.what() <<std::flush;
237 return OC_STACK_KEEP_TRANSACTION;
240 OCStackResult InProcClientWrapper::ListenForDevice(
241 const std::string& serviceUrl, // unused
242 const std::string& deviceURI,
243 OCConnectivityType connectivityType,
244 FindDeviceCallback& callback,
245 QualityOfService QoS)
249 return OC_STACK_INVALID_PARAM;
251 OCStackResult result;
253 ClientCallbackContext::DeviceListenContext* context =
254 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
255 OCCallbackData cbdata(
256 static_cast<void*>(context),
257 listenDeviceCallback,
258 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
261 auto cLock = m_csdkLock.lock();
264 std::lock_guard<std::recursive_mutex> lock(*cLock);
265 result = OCDoResource(nullptr, OC_REST_DISCOVER,
267 nullptr, nullptr, connectivityType,
268 static_cast<OCQualityOfService>(QoS),
275 result = OC_STACK_ERROR;
280 void parseServerHeaderOptions(OCClientResponse* clientResponse,
281 HeaderOptions& serverHeaderOptions)
285 // Parse header options from server
287 std::string optionData;
289 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
291 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
292 optionData = reinterpret_cast<const char*>
293 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
294 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
295 serverHeaderOptions.push_back(headerOption);
300 // clientResponse is invalid
301 // TODO check proper logging
302 std::cout << " Invalid response " << std::endl;
306 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
307 OCClientResponse* clientResponse)
309 ClientCallbackContext::GetContext* context =
310 static_cast<ClientCallbackContext::GetContext*>(ctx);
312 OCRepresentation rep;
313 HeaderOptions serverHeaderOptions;
314 OCStackResult result = clientResponse->result;
315 if(result == OC_STACK_OK)
317 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
320 rep = parseGetSetCallback(clientResponse);
322 catch(OC::OCException& e)
328 std::thread exec(context->callback, serverHeaderOptions, rep, result);
330 return OC_STACK_DELETE_TRANSACTION;
333 OCStackResult InProcClientWrapper::GetResourceRepresentation(
334 const OCDevAddr& devAddr,
335 const std::string& resourceUri,
336 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
337 GetCallback& callback, QualityOfService QoS)
341 return OC_STACK_INVALID_PARAM;
343 OCStackResult result;
344 ClientCallbackContext::GetContext* ctx =
345 new ClientCallbackContext::GetContext(callback);
346 OCCallbackData cbdata(
347 static_cast<void*>(ctx),
349 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
352 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
354 auto cLock = m_csdkLock.lock();
358 std::lock_guard<std::recursive_mutex> lock(*cLock);
359 OCHeaderOption options[MAX_HEADER_OPTIONS];
361 result = OCDoResource(
362 nullptr, OC_REST_GET,
366 static_cast<OCQualityOfService>(QoS),
368 assembleHeaderOptions(options, headerOptions),
369 headerOptions.size());
374 result = OC_STACK_ERROR;
380 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
381 OCClientResponse* clientResponse)
383 ClientCallbackContext::SetContext* context =
384 static_cast<ClientCallbackContext::SetContext*>(ctx);
385 OCRepresentation attrs;
386 HeaderOptions serverHeaderOptions;
388 OCStackResult result = clientResponse->result;
389 if (OC_STACK_OK == result ||
390 OC_STACK_RESOURCE_CREATED == result ||
391 OC_STACK_RESOURCE_DELETED == result)
393 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
396 attrs = parseGetSetCallback(clientResponse);
398 catch(OC::OCException& e)
404 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
406 return OC_STACK_DELETE_TRANSACTION;
409 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
410 const QueryParamsMap& queryParams)
412 if(uri.back() == '/')
414 uri.resize(uri.size()-1);
417 ostringstream paramsList;
418 if(queryParams.size() > 0)
423 for(auto& param : queryParams)
425 paramsList << param.first <<'='<<param.second<<';';
428 std::string queryString = paramsList.str();
429 if(queryString.back() == ';')
431 queryString.resize(queryString.size() - 1);
434 std::string ret = uri + queryString;
438 OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
440 MessageContainer ocInfo;
441 ocInfo.addRepresentation(rep);
442 return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
445 OCStackResult InProcClientWrapper::PostResourceRepresentation(
446 const OCDevAddr& devAddr,
447 const std::string& uri,
448 const OCRepresentation& rep,
449 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
450 PostCallback& callback, QualityOfService QoS)
454 return OC_STACK_INVALID_PARAM;
456 OCStackResult result;
457 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
458 OCCallbackData cbdata(
459 static_cast<void*>(ctx),
461 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
464 std::string url = assembleSetResourceUri(uri, queryParams);
466 auto cLock = m_csdkLock.lock();
470 std::lock_guard<std::recursive_mutex> lock(*cLock);
471 OCHeaderOption options[MAX_HEADER_OPTIONS];
473 result = OCDoResource(nullptr, OC_REST_POST,
474 url.c_str(), &devAddr,
475 assembleSetResourcePayload(rep),
477 static_cast<OCQualityOfService>(QoS),
479 assembleHeaderOptions(options, headerOptions),
480 headerOptions.size());
485 result = OC_STACK_ERROR;
491 OCStackResult InProcClientWrapper::PutResourceRepresentation(
492 const OCDevAddr& devAddr,
493 const std::string& uri,
494 const OCRepresentation& rep,
495 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
496 PutCallback& callback, QualityOfService QoS)
500 return OC_STACK_INVALID_PARAM;
502 OCStackResult result;
503 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
504 OCCallbackData cbdata(
505 static_cast<void*>(ctx),
507 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
510 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
512 auto cLock = m_csdkLock.lock();
516 std::lock_guard<std::recursive_mutex> lock(*cLock);
518 OCHeaderOption options[MAX_HEADER_OPTIONS];
520 result = OCDoResource(&handle, OC_REST_PUT,
521 url.c_str(), &devAddr,
522 assembleSetResourcePayload(rep),
524 static_cast<OCQualityOfService>(QoS),
526 assembleHeaderOptions(options, headerOptions),
527 headerOptions.size());
532 result = OC_STACK_ERROR;
538 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
539 OCClientResponse* clientResponse)
541 ClientCallbackContext::DeleteContext* context =
542 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
543 HeaderOptions serverHeaderOptions;
545 if(clientResponse->result == OC_STACK_OK)
547 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
549 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
551 return OC_STACK_DELETE_TRANSACTION;
554 OCStackResult InProcClientWrapper::DeleteResource(
555 const OCDevAddr& devAddr,
556 const std::string& uri,
557 const HeaderOptions& headerOptions, DeleteCallback& callback, QualityOfService QoS)
561 return OC_STACK_INVALID_PARAM;
563 OCStackResult result;
564 ClientCallbackContext::DeleteContext* ctx =
565 new ClientCallbackContext::DeleteContext(callback);
566 OCCallbackData cbdata(
567 static_cast<void*>(ctx),
568 deleteResourceCallback,
569 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
572 auto cLock = m_csdkLock.lock();
576 OCHeaderOption options[MAX_HEADER_OPTIONS];
578 std::lock_guard<std::recursive_mutex> lock(*cLock);
580 result = OCDoResource(nullptr, OC_REST_DELETE,
581 uri.c_str(), &devAddr,
584 static_cast<OCQualityOfService>(m_cfg.QoS),
586 assembleHeaderOptions(options, headerOptions),
587 headerOptions.size());
592 result = OC_STACK_ERROR;
598 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
599 OCClientResponse* clientResponse)
601 ClientCallbackContext::ObserveContext* context =
602 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
603 OCRepresentation attrs;
604 HeaderOptions serverHeaderOptions;
605 uint32_t sequenceNumber = clientResponse->sequenceNumber;
606 OCStackResult result = clientResponse->result;
607 if(clientResponse->result == OC_STACK_OK)
609 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
612 attrs = parseGetSetCallback(clientResponse);
614 catch(OC::OCException& e)
619 std::thread exec(context->callback, serverHeaderOptions, attrs,
620 result, sequenceNumber);
622 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
624 return OC_STACK_DELETE_TRANSACTION;
626 return OC_STACK_KEEP_TRANSACTION;
629 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
630 const OCDevAddr& devAddr,
631 const std::string& uri,
632 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
633 ObserveCallback& callback, QualityOfService QoS)
637 return OC_STACK_INVALID_PARAM;
639 OCStackResult result;
641 ClientCallbackContext::ObserveContext* ctx =
642 new ClientCallbackContext::ObserveContext(callback);
643 OCCallbackData cbdata(
644 static_cast<void*>(ctx),
645 observeResourceCallback,
646 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
650 if (observeType == ObserveType::Observe)
652 method = OC_REST_OBSERVE;
654 else if (observeType == ObserveType::ObserveAll)
656 method = OC_REST_OBSERVE_ALL;
660 method = OC_REST_OBSERVE_ALL;
663 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
665 auto cLock = m_csdkLock.lock();
669 std::lock_guard<std::recursive_mutex> lock(*cLock);
670 OCHeaderOption options[MAX_HEADER_OPTIONS];
672 result = OCDoResource(handle, method,
673 url.c_str(), &devAddr,
676 static_cast<OCQualityOfService>(QoS),
678 assembleHeaderOptions(options, headerOptions),
679 headerOptions.size());
684 return OC_STACK_ERROR;
690 OCStackResult InProcClientWrapper::CancelObserveResource(
692 const std::string& host, // unused
693 const std::string& uri, // unused
694 const HeaderOptions& headerOptions,
695 QualityOfService QoS)
697 OCStackResult result;
698 auto cLock = m_csdkLock.lock();
702 std::lock_guard<std::recursive_mutex> lock(*cLock);
703 OCHeaderOption options[MAX_HEADER_OPTIONS];
705 result = OCCancel(handle,
706 static_cast<OCQualityOfService>(QoS),
707 assembleHeaderOptions(options, headerOptions),
708 headerOptions.size());
712 result = OC_STACK_ERROR;
718 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
719 OCClientResponse* clientResponse)
721 ClientCallbackContext::SubscribePresenceContext* context =
722 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
725 * This a hack while we rethink presence subscription.
727 std::string url = clientResponse->devAddr.addr;
729 std::thread exec(context->callback, clientResponse->result,
730 clientResponse->sequenceNumber, url);
734 return OC_STACK_KEEP_TRANSACTION;
737 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
738 const std::string& host, const std::string& resourceType,
739 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
743 return OC_STACK_INVALID_PARAM;
746 ClientCallbackContext::SubscribePresenceContext* ctx =
747 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
748 OCCallbackData cbdata(
749 static_cast<void*>(ctx),
750 subscribePresenceCallback,
752 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
755 auto cLock = m_csdkLock.lock();
757 std::ostringstream os;
758 os << host << OC_PRESENCE_URI;;
760 if(!resourceType.empty())
762 os << "?rt=" << resourceType;
768 return OC_STACK_ERROR;
771 return OCDoResource(handle, OC_REST_PRESENCE,
772 os.str().c_str(), nullptr,
773 nullptr, connectivityType,
774 OC_LOW_QOS, &cbdata, NULL, 0);
777 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
779 OCStackResult result;
780 auto cLock = m_csdkLock.lock();
784 std::lock_guard<std::recursive_mutex> lock(*cLock);
785 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
789 result = OC_STACK_ERROR;
795 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
801 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
802 const HeaderOptions& headerOptions)
806 if( headerOptions.size() == 0)
811 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
813 options[i] = OCHeaderOption(OC_COAP_ID,
815 it->getOptionData().length() + 1,
816 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));