1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
32 InProcClientWrapper::InProcClientWrapper(
33 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34 : m_threadRun(false), m_csdkLock(csdkLock),
37 // if the config type is server, we ought to never get called. If the config type
38 // is both, we count on the server to run the thread and do the initialize
40 if(m_cfg.mode == ModeType::Client)
42 OCTransportFlags serverFlags =
43 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44 OCTransportFlags clientFlags =
45 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
48 if(OC_STACK_OK != result)
50 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
54 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
58 InProcClientWrapper::~InProcClientWrapper()
60 if(m_threadRun && m_listeningThread.joinable())
63 m_listeningThread.join();
66 // only stop if we are the ones who actually called 'init'. We are counting
67 // on the server to do the stop.
68 if(m_cfg.mode == ModeType::Client)
74 void InProcClientWrapper::listeningFunc()
79 auto cLock = m_csdkLock.lock();
82 std::lock_guard<std::recursive_mutex> lock(*cLock);
87 result = OC_STACK_ERROR;
90 if(result != OC_STACK_OK)
92 // TODO: do something with result if failed?
95 // To minimize CPU utilization we may wish to do this with sleep
96 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
102 if(clientResponse->payload == nullptr ||
104 clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105 clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106 clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
110 //OCPayloadDestroy(clientResponse->payload);
111 return OCRepresentation();
115 oc.setPayload(clientResponse->payload);
116 //OCPayloadDestroy(clientResponse->payload);
118 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119 if(it == oc.representations().end())
121 return OCRepresentation();
124 // first one is considered the root, everything else is considered a child of this one.
125 OCRepresentation root = *it;
126 root.setDevAddr(clientResponse->devAddr);
127 root.setUri(clientResponse->resourceUri);
130 std::for_each(it, oc.representations().end(),
131 [&root](const OCRepresentation& repItr)
132 {root.addChild(repItr);});
137 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
138 OCClientResponse* clientResponse)
140 ClientCallbackContext::ListenContext* context =
141 static_cast<ClientCallbackContext::ListenContext*>(ctx);
143 if(clientResponse->result != OC_STACK_OK)
145 oclog() << "listenCallback(): failed to create resource. clientResponse: "
146 << clientResponse->result
149 return OC_STACK_KEEP_TRANSACTION;
152 if(!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
154 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
156 return OC_STACK_KEEP_TRANSACTION;
159 auto clientWrapper = context->clientWrapper.lock();
163 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
165 return OC_STACK_KEEP_TRANSACTION;
169 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
170 reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
171 // loop to ensure valid construction of all resources
172 for(auto resource : container.Resources())
174 std::thread exec(context->callback, resource);
178 catch (std::exception &e){
179 oclog() << "Exception in listCallback, ignoring response: "
180 << e.what() << std::flush;
184 return OC_STACK_KEEP_TRANSACTION;
187 OCStackResult InProcClientWrapper::ListenForResource(
188 const std::string& serviceUrl,
189 const std::string& resourceType,
190 OCConnectivityType connectivityType,
191 FindCallback& callback, QualityOfService QoS)
195 return OC_STACK_INVALID_PARAM;
198 OCStackResult result;
199 ostringstream resourceUri;
200 resourceUri << serviceUrl << resourceType;
202 ClientCallbackContext::ListenContext* context =
203 new ClientCallbackContext::ListenContext(callback, shared_from_this());
204 OCCallbackData cbdata;
205 cbdata.context = (void*)context;
206 cbdata.cb = listenCallback;
207 cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
209 auto cLock = m_csdkLock.lock();
212 std::lock_guard<std::recursive_mutex> lock(*cLock);
213 result = OCDoResource(nullptr, OC_REST_DISCOVER,
214 resourceUri.str().c_str(),
215 nullptr, nullptr, connectivityType,
216 static_cast<OCQualityOfService>(QoS),
223 result = OC_STACK_ERROR;
228 OCStackApplicationResult listenDeviceCallback(void* ctx,
229 OCDoHandle /*handle*/,
230 OCClientResponse* clientResponse)
232 ClientCallbackContext::DeviceListenContext* context =
233 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
237 OCRepresentation rep = parseGetSetCallback(clientResponse);
238 std::thread exec(context->callback, rep);
241 catch(OC::OCException& e)
243 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
244 <<e.what() <<std::flush;
247 return OC_STACK_KEEP_TRANSACTION;
250 OCStackResult InProcClientWrapper::ListenForDevice(
251 const std::string& serviceUrl,
252 const std::string& deviceURI,
253 OCConnectivityType connectivityType,
254 FindDeviceCallback& callback,
255 QualityOfService QoS)
259 return OC_STACK_INVALID_PARAM;
261 OCStackResult result;
262 ostringstream deviceUri;
263 deviceUri << serviceUrl << deviceURI;
265 ClientCallbackContext::DeviceListenContext* context =
266 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
267 OCCallbackData cbdata;
269 cbdata.context = (void*)context;
270 cbdata.cb = listenDeviceCallback;
271 cbdata.cd = [](void* c){delete (ClientCallbackContext::DeviceListenContext*)c;};
273 auto cLock = m_csdkLock.lock();
276 std::lock_guard<std::recursive_mutex> lock(*cLock);
277 result = OCDoResource(nullptr, OC_REST_DISCOVER,
278 deviceUri.str().c_str(),
279 nullptr, nullptr, connectivityType,
280 static_cast<OCQualityOfService>(QoS),
287 result = OC_STACK_ERROR;
292 void parseServerHeaderOptions(OCClientResponse* clientResponse,
293 HeaderOptions& serverHeaderOptions)
297 // Parse header options from server
299 std::string optionData;
301 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
303 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
304 optionData = reinterpret_cast<const char*>
305 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
306 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
307 serverHeaderOptions.push_back(headerOption);
312 // clientResponse is invalid
313 // TODO check proper logging
314 std::cout << " Invalid response " << std::endl;
318 OCStackApplicationResult getResourceCallback(void* ctx,
319 OCDoHandle /*handle*/,
320 OCClientResponse* clientResponse)
322 ClientCallbackContext::GetContext* context =
323 static_cast<ClientCallbackContext::GetContext*>(ctx);
325 OCRepresentation rep;
326 HeaderOptions serverHeaderOptions;
327 OCStackResult result = clientResponse->result;
328 if(result == OC_STACK_OK)
330 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
333 rep = parseGetSetCallback(clientResponse);
335 catch(OC::OCException& e)
341 std::thread exec(context->callback, serverHeaderOptions, rep, result);
343 return OC_STACK_DELETE_TRANSACTION;
346 OCStackResult InProcClientWrapper::GetResourceRepresentation(
347 const OCDevAddr& devAddr,
348 const std::string& resourceUri,
349 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
350 GetCallback& callback, QualityOfService QoS)
354 return OC_STACK_INVALID_PARAM;
356 OCStackResult result;
357 ClientCallbackContext::GetContext* ctx =
358 new ClientCallbackContext::GetContext(callback);
359 OCCallbackData cbdata;
360 cbdata.context = (void*)ctx;
361 cbdata.cb = getResourceCallback;
362 cbdata.cd = [](void* c){delete (ClientCallbackContext::GetContext*)c;};
365 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
367 auto cLock = m_csdkLock.lock();
371 std::lock_guard<std::recursive_mutex> lock(*cLock);
372 OCHeaderOption options[MAX_HEADER_OPTIONS];
374 result = OCDoResource(
375 nullptr, OC_REST_GET,
379 static_cast<OCQualityOfService>(QoS),
381 assembleHeaderOptions(options, headerOptions),
382 headerOptions.size());
387 result = OC_STACK_ERROR;
393 OCStackApplicationResult setResourceCallback(void* ctx,
394 OCDoHandle /*handle*/,
395 OCClientResponse* clientResponse)
397 ClientCallbackContext::SetContext* context =
398 static_cast<ClientCallbackContext::SetContext*>(ctx);
399 OCRepresentation attrs;
400 HeaderOptions serverHeaderOptions;
402 OCStackResult result = clientResponse->result;
403 if (OC_STACK_OK == result ||
404 OC_STACK_RESOURCE_CREATED == result ||
405 OC_STACK_RESOURCE_DELETED == result)
407 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
410 attrs = parseGetSetCallback(clientResponse);
412 catch(OC::OCException& e)
418 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
420 return OC_STACK_DELETE_TRANSACTION;
423 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
424 const QueryParamsMap& queryParams)
428 if(uri.back() == '/')
430 uri.resize(uri.size()-1);
434 std::ostringstream paramsList;
435 if(queryParams.size() > 0)
440 for(auto& param : queryParams)
442 paramsList << param.first <<'='<<param.second<<';';
445 std::string queryString = paramsList.str();
447 if(queryString.empty())
452 if(queryString.back() == ';')
454 queryString.resize(queryString.size() - 1);
457 std::string ret = uri + queryString;
461 OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
463 MessageContainer ocInfo;
464 ocInfo.addRepresentation(rep);
465 for(const OCRepresentation& r : rep.getChildren())
467 ocInfo.addRepresentation(r);
470 return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
473 OCStackResult InProcClientWrapper::PostResourceRepresentation(
474 const OCDevAddr& devAddr,
475 const std::string& uri,
476 const OCRepresentation& rep,
477 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
478 PostCallback& callback, QualityOfService QoS)
482 return OC_STACK_INVALID_PARAM;
484 OCStackResult result;
485 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
486 OCCallbackData cbdata;
487 cbdata.context = (void*)ctx;
488 cbdata.cb = setResourceCallback;
489 cbdata.cd = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
492 std::string url = assembleSetResourceUri(uri, queryParams);
494 auto cLock = m_csdkLock.lock();
498 std::lock_guard<std::recursive_mutex> lock(*cLock);
499 OCHeaderOption options[MAX_HEADER_OPTIONS];
501 result = OCDoResource(nullptr, OC_REST_POST,
502 url.c_str(), &devAddr,
503 assembleSetResourcePayload(rep),
505 static_cast<OCQualityOfService>(QoS),
507 assembleHeaderOptions(options, headerOptions),
508 headerOptions.size());
513 result = OC_STACK_ERROR;
519 OCStackResult InProcClientWrapper::PutResourceRepresentation(
520 const OCDevAddr& devAddr,
521 const std::string& uri,
522 const OCRepresentation& rep,
523 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
524 PutCallback& callback, QualityOfService QoS)
528 return OC_STACK_INVALID_PARAM;
530 OCStackResult result;
531 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
532 OCCallbackData cbdata;
533 cbdata.context = (void*)ctx;
534 cbdata.cb = setResourceCallback;
535 cbdata.cd = [](void* c){delete (ClientCallbackContext::SetContext*)c;};
538 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
540 auto cLock = m_csdkLock.lock();
544 std::lock_guard<std::recursive_mutex> lock(*cLock);
546 OCHeaderOption options[MAX_HEADER_OPTIONS];
548 result = OCDoResource(&handle, OC_REST_PUT,
549 url.c_str(), &devAddr,
550 assembleSetResourcePayload(rep),
552 static_cast<OCQualityOfService>(QoS),
554 assembleHeaderOptions(options, headerOptions),
555 headerOptions.size());
560 result = OC_STACK_ERROR;
566 OCStackApplicationResult deleteResourceCallback(void* ctx,
567 OCDoHandle /*handle*/,
568 OCClientResponse* clientResponse)
570 ClientCallbackContext::DeleteContext* context =
571 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
572 HeaderOptions serverHeaderOptions;
574 if(clientResponse->result == OC_STACK_OK)
576 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
578 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
580 return OC_STACK_DELETE_TRANSACTION;
583 OCStackResult InProcClientWrapper::DeleteResource(
584 const OCDevAddr& devAddr,
585 const std::string& uri,
586 const HeaderOptions& headerOptions,
587 DeleteCallback& callback,
588 QualityOfService /*QoS*/)
592 return OC_STACK_INVALID_PARAM;
594 OCStackResult result;
595 ClientCallbackContext::DeleteContext* ctx =
596 new ClientCallbackContext::DeleteContext(callback);
597 OCCallbackData cbdata;
598 cbdata.context = (void*)ctx;
599 cbdata.cb = deleteResourceCallback;
600 cbdata.cd = [](void* c){delete (ClientCallbackContext::DeleteContext*)c;};
603 auto cLock = m_csdkLock.lock();
607 OCHeaderOption options[MAX_HEADER_OPTIONS];
609 std::lock_guard<std::recursive_mutex> lock(*cLock);
611 result = OCDoResource(nullptr, OC_REST_DELETE,
612 uri.c_str(), &devAddr,
615 static_cast<OCQualityOfService>(m_cfg.QoS),
617 assembleHeaderOptions(options, headerOptions),
618 headerOptions.size());
623 result = OC_STACK_ERROR;
629 OCStackApplicationResult observeResourceCallback(void* ctx,
630 OCDoHandle /*handle*/,
631 OCClientResponse* clientResponse)
633 ClientCallbackContext::ObserveContext* context =
634 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
635 OCRepresentation attrs;
636 HeaderOptions serverHeaderOptions;
637 uint32_t sequenceNumber = clientResponse->sequenceNumber;
638 OCStackResult result = clientResponse->result;
639 if(clientResponse->result == OC_STACK_OK)
641 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
644 attrs = parseGetSetCallback(clientResponse);
646 catch(OC::OCException& e)
651 std::thread exec(context->callback, serverHeaderOptions, attrs,
652 result, sequenceNumber);
654 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
656 return OC_STACK_DELETE_TRANSACTION;
658 return OC_STACK_KEEP_TRANSACTION;
661 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
662 const OCDevAddr& devAddr,
663 const std::string& uri,
664 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
665 ObserveCallback& callback, QualityOfService QoS)
669 return OC_STACK_INVALID_PARAM;
671 OCStackResult result;
673 ClientCallbackContext::ObserveContext* ctx =
674 new ClientCallbackContext::ObserveContext(callback);
675 OCCallbackData cbdata;
676 cbdata.context = (void*)ctx;
677 cbdata.cb = observeResourceCallback;
678 cbdata.cd = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
682 if (observeType == ObserveType::Observe)
684 method = OC_REST_OBSERVE;
686 else if (observeType == ObserveType::ObserveAll)
688 method = OC_REST_OBSERVE_ALL;
692 method = OC_REST_OBSERVE_ALL;
695 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
697 auto cLock = m_csdkLock.lock();
701 std::lock_guard<std::recursive_mutex> lock(*cLock);
702 OCHeaderOption options[MAX_HEADER_OPTIONS];
704 result = OCDoResource(handle, method,
705 url.c_str(), &devAddr,
708 static_cast<OCQualityOfService>(QoS),
710 assembleHeaderOptions(options, headerOptions),
711 headerOptions.size());
716 return OC_STACK_ERROR;
722 OCStackResult InProcClientWrapper::CancelObserveResource(
724 const std::string& /*host*/,
725 const std::string& /*uri*/,
726 const HeaderOptions& headerOptions,
727 QualityOfService QoS)
729 OCStackResult result;
730 auto cLock = m_csdkLock.lock();
734 std::lock_guard<std::recursive_mutex> lock(*cLock);
735 OCHeaderOption options[MAX_HEADER_OPTIONS];
737 result = OCCancel(handle,
738 static_cast<OCQualityOfService>(QoS),
739 assembleHeaderOptions(options, headerOptions),
740 headerOptions.size());
744 result = OC_STACK_ERROR;
750 OCStackApplicationResult subscribePresenceCallback(void* ctx,
751 OCDoHandle /*handle*/,
752 OCClientResponse* clientResponse)
754 ClientCallbackContext::SubscribePresenceContext* context =
755 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
758 * This a hack while we rethink presence subscription.
760 std::string url = clientResponse->devAddr.addr;
762 std::thread exec(context->callback, clientResponse->result,
763 clientResponse->sequenceNumber, url);
767 return OC_STACK_KEEP_TRANSACTION;
770 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
771 const std::string& host, const std::string& resourceType,
772 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
776 return OC_STACK_INVALID_PARAM;
779 ClientCallbackContext::SubscribePresenceContext* ctx =
780 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
781 OCCallbackData cbdata;
782 cbdata.context = (void*)ctx;
783 cbdata.cb = subscribePresenceCallback;
784 cbdata.cd = [](void* c){delete (ClientCallbackContext::SubscribePresenceContext*)c;};
787 auto cLock = m_csdkLock.lock();
789 std::ostringstream os;
790 os << host << OC_RSRVD_PRESENCE_URI;
792 if(!resourceType.empty())
794 os << "?rt=" << resourceType;
800 return OC_STACK_ERROR;
803 return OCDoResource(handle, OC_REST_PRESENCE,
804 os.str().c_str(), nullptr,
805 nullptr, connectivityType,
806 OC_LOW_QOS, &cbdata, NULL, 0);
809 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
811 OCStackResult result;
812 auto cLock = m_csdkLock.lock();
816 std::lock_guard<std::recursive_mutex> lock(*cLock);
817 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
821 result = OC_STACK_ERROR;
827 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
833 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
834 const HeaderOptions& headerOptions)
838 if( headerOptions.size() == 0)
843 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
845 options[i] = OCHeaderOption();
846 options[i].protocolID = OC_COAP_ID;
847 options[i].optionID = it->getOptionID();
848 options[i].optionLength = it->getOptionData().length() + 1;
849 strcpy((char*)options[i].optionData, (it->getOptionData().c_str()));