1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include "ocpayload.h"
27 #include <OCSerialization.h>
32 InProcClientWrapper::InProcClientWrapper(
33 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
34 : m_threadRun(false), m_csdkLock(csdkLock),
37 // if the config type is server, we ought to never get called. If the config type
38 // is both, we count on the server to run the thread and do the initialize
40 if(m_cfg.mode == ModeType::Client)
42 OCTransportFlags serverFlags =
43 static_cast<OCTransportFlags>(m_cfg.serverConnectivity & CT_MASK_FLAGS);
44 OCTransportFlags clientFlags =
45 static_cast<OCTransportFlags>(m_cfg.clientConnectivity & CT_MASK_FLAGS);
46 OCStackResult result = OCInit1(OC_CLIENT, serverFlags, clientFlags);
48 if(OC_STACK_OK != result)
50 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
54 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
58 InProcClientWrapper::~InProcClientWrapper()
60 if(m_threadRun && m_listeningThread.joinable())
63 m_listeningThread.join();
66 // only stop if we are the ones who actually called 'init'. We are counting
67 // on the server to do the stop.
68 if(m_cfg.mode == ModeType::Client)
74 void InProcClientWrapper::listeningFunc()
79 auto cLock = m_csdkLock.lock();
82 std::lock_guard<std::recursive_mutex> lock(*cLock);
87 result = OC_STACK_ERROR;
90 if(result != OC_STACK_OK)
92 // TODO: do something with result if failed?
95 // To minimize CPU utilization we may wish to do this with sleep
96 std::this_thread::sleep_for(std::chrono::milliseconds(10));
100 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
102 if(clientResponse->payload == nullptr ||
104 clientResponse->payload->type != PAYLOAD_TYPE_DEVICE &&
105 clientResponse->payload->type != PAYLOAD_TYPE_PLATFORM &&
106 clientResponse->payload->type != PAYLOAD_TYPE_REPRESENTATION
110 //OCPayloadDestroy(clientResponse->payload);
111 return OCRepresentation();
115 oc.setPayload(clientResponse->payload);
116 //OCPayloadDestroy(clientResponse->payload);
118 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
119 if(it == oc.representations().end())
121 return OCRepresentation();
124 // first one is considered the root, everything else is considered a child of this one.
125 OCRepresentation root = *it;
126 root.setDevAddr(clientResponse->devAddr);
129 std::for_each(it, oc.representations().end(),
130 [&root](const OCRepresentation& repItr)
131 {root.addChild(repItr);});
136 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle /*handle*/,
137 OCClientResponse* clientResponse)
139 ClientCallbackContext::ListenContext* context =
140 static_cast<ClientCallbackContext::ListenContext*>(ctx);
142 if(clientResponse->result != OC_STACK_OK)
144 oclog() << "listenCallback(): failed to create resource. clientResponse: "
145 << clientResponse->result
148 return OC_STACK_KEEP_TRANSACTION;
151 if(!clientResponse->payload || clientResponse->payload->type != PAYLOAD_TYPE_DISCOVERY)
153 oclog() << "listenCallback(): clientResponse payload was null or the wrong type"
155 return OC_STACK_KEEP_TRANSACTION;
158 auto clientWrapper = context->clientWrapper.lock();
162 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
164 return OC_STACK_KEEP_TRANSACTION;
167 ListenOCContainer container(clientWrapper, clientResponse->devAddr,
168 reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
169 // loop to ensure valid construction of all resources
170 for(auto resource : container.Resources())
172 std::thread exec(context->callback, resource);
177 return OC_STACK_KEEP_TRANSACTION;
180 OCStackResult InProcClientWrapper::ListenForResource(
181 const std::string& serviceUrl,
182 const std::string& resourceType,
183 OCConnectivityType connectivityType,
184 FindCallback& callback, QualityOfService QoS)
188 return OC_STACK_INVALID_PARAM;
191 OCStackResult result;
192 ostringstream resourceUri;
193 resourceUri << serviceUrl << resourceType;
195 ClientCallbackContext::ListenContext* context =
196 new ClientCallbackContext::ListenContext(callback, shared_from_this());
197 OCCallbackData cbdata(
198 static_cast<void*>(context),
200 [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);}
203 auto cLock = m_csdkLock.lock();
206 std::lock_guard<std::recursive_mutex> lock(*cLock);
207 result = OCDoResource(nullptr, OC_REST_DISCOVER,
208 resourceUri.str().c_str(),
209 nullptr, nullptr, connectivityType,
210 static_cast<OCQualityOfService>(QoS),
217 result = OC_STACK_ERROR;
222 OCStackApplicationResult listenDeviceCallback(void* ctx,
223 OCDoHandle /*handle*/,
224 OCClientResponse* clientResponse)
226 ClientCallbackContext::DeviceListenContext* context =
227 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
231 OCRepresentation rep = parseGetSetCallback(clientResponse);
232 std::thread exec(context->callback, rep);
235 catch(OC::OCException& e)
237 oclog() <<"Exception in listenDeviceCallback, ignoring response: "
238 <<e.what() <<std::flush;
241 return OC_STACK_KEEP_TRANSACTION;
244 OCStackResult InProcClientWrapper::ListenForDevice(
245 const std::string& serviceUrl,
246 const std::string& deviceURI,
247 OCConnectivityType connectivityType,
248 FindDeviceCallback& callback,
249 QualityOfService QoS)
253 return OC_STACK_INVALID_PARAM;
255 OCStackResult result;
256 ostringstream deviceUri;
257 deviceUri << serviceUrl << deviceURI;
259 ClientCallbackContext::DeviceListenContext* context =
260 new ClientCallbackContext::DeviceListenContext(callback, shared_from_this());
261 OCCallbackData cbdata(
262 static_cast<void*>(context),
263 listenDeviceCallback,
264 [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);}
267 auto cLock = m_csdkLock.lock();
270 std::lock_guard<std::recursive_mutex> lock(*cLock);
271 result = OCDoResource(nullptr, OC_REST_DISCOVER,
272 deviceUri.str().c_str(),
273 nullptr, nullptr, connectivityType,
274 static_cast<OCQualityOfService>(QoS),
281 result = OC_STACK_ERROR;
286 void parseServerHeaderOptions(OCClientResponse* clientResponse,
287 HeaderOptions& serverHeaderOptions)
291 // Parse header options from server
293 std::string optionData;
295 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
297 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
298 optionData = reinterpret_cast<const char*>
299 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
300 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
301 serverHeaderOptions.push_back(headerOption);
306 // clientResponse is invalid
307 // TODO check proper logging
308 std::cout << " Invalid response " << std::endl;
312 OCStackApplicationResult getResourceCallback(void* ctx,
313 OCDoHandle /*handle*/,
314 OCClientResponse* clientResponse)
316 ClientCallbackContext::GetContext* context =
317 static_cast<ClientCallbackContext::GetContext*>(ctx);
319 OCRepresentation rep;
320 HeaderOptions serverHeaderOptions;
321 OCStackResult result = clientResponse->result;
322 if(result == OC_STACK_OK)
324 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
327 rep = parseGetSetCallback(clientResponse);
329 catch(OC::OCException& e)
335 std::thread exec(context->callback, serverHeaderOptions, rep, result);
337 return OC_STACK_DELETE_TRANSACTION;
340 OCStackResult InProcClientWrapper::GetResourceRepresentation(
341 const OCDevAddr& devAddr,
342 const std::string& resourceUri,
343 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
344 GetCallback& callback, QualityOfService QoS)
348 return OC_STACK_INVALID_PARAM;
350 OCStackResult result;
351 ClientCallbackContext::GetContext* ctx =
352 new ClientCallbackContext::GetContext(callback);
353 OCCallbackData cbdata(
354 static_cast<void*>(ctx),
356 [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);}
359 std::string uri = assembleSetResourceUri(resourceUri, queryParams);
361 auto cLock = m_csdkLock.lock();
365 std::lock_guard<std::recursive_mutex> lock(*cLock);
366 OCHeaderOption options[MAX_HEADER_OPTIONS];
368 result = OCDoResource(
369 nullptr, OC_REST_GET,
373 static_cast<OCQualityOfService>(QoS),
375 assembleHeaderOptions(options, headerOptions),
376 headerOptions.size());
381 result = OC_STACK_ERROR;
387 OCStackApplicationResult setResourceCallback(void* ctx,
388 OCDoHandle /*handle*/,
389 OCClientResponse* clientResponse)
391 ClientCallbackContext::SetContext* context =
392 static_cast<ClientCallbackContext::SetContext*>(ctx);
393 OCRepresentation attrs;
394 HeaderOptions serverHeaderOptions;
396 OCStackResult result = clientResponse->result;
397 if (OC_STACK_OK == result ||
398 OC_STACK_RESOURCE_CREATED == result ||
399 OC_STACK_RESOURCE_DELETED == result)
401 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
404 attrs = parseGetSetCallback(clientResponse);
406 catch(OC::OCException& e)
412 std::thread exec(context->callback, serverHeaderOptions, attrs, result);
414 return OC_STACK_DELETE_TRANSACTION;
417 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
418 const QueryParamsMap& queryParams)
420 if(uri.back() == '/')
422 uri.resize(uri.size()-1);
425 ostringstream paramsList;
426 if(queryParams.size() > 0)
431 for(auto& param : queryParams)
433 paramsList << param.first <<'='<<param.second<<';';
436 std::string queryString = paramsList.str();
437 if(queryString.back() == ';')
439 queryString.resize(queryString.size() - 1);
442 std::string ret = uri + queryString;
446 OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
448 MessageContainer ocInfo;
449 ocInfo.addRepresentation(rep);
450 return reinterpret_cast<OCPayload*>(ocInfo.getPayload());
453 OCStackResult InProcClientWrapper::PostResourceRepresentation(
454 const OCDevAddr& devAddr,
455 const std::string& uri,
456 const OCRepresentation& rep,
457 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
458 PostCallback& callback, QualityOfService QoS)
462 return OC_STACK_INVALID_PARAM;
464 OCStackResult result;
465 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
466 OCCallbackData cbdata(
467 static_cast<void*>(ctx),
469 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
472 std::string url = assembleSetResourceUri(uri, queryParams);
474 auto cLock = m_csdkLock.lock();
478 std::lock_guard<std::recursive_mutex> lock(*cLock);
479 OCHeaderOption options[MAX_HEADER_OPTIONS];
481 result = OCDoResource(nullptr, OC_REST_POST,
482 url.c_str(), &devAddr,
483 assembleSetResourcePayload(rep),
485 static_cast<OCQualityOfService>(QoS),
487 assembleHeaderOptions(options, headerOptions),
488 headerOptions.size());
493 result = OC_STACK_ERROR;
499 OCStackResult InProcClientWrapper::PutResourceRepresentation(
500 const OCDevAddr& devAddr,
501 const std::string& uri,
502 const OCRepresentation& rep,
503 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
504 PutCallback& callback, QualityOfService QoS)
508 return OC_STACK_INVALID_PARAM;
510 OCStackResult result;
511 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext(callback);
512 OCCallbackData cbdata(
513 static_cast<void*>(ctx),
515 [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);}
518 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
520 auto cLock = m_csdkLock.lock();
524 std::lock_guard<std::recursive_mutex> lock(*cLock);
526 OCHeaderOption options[MAX_HEADER_OPTIONS];
528 result = OCDoResource(&handle, OC_REST_PUT,
529 url.c_str(), &devAddr,
530 assembleSetResourcePayload(rep),
532 static_cast<OCQualityOfService>(QoS),
534 assembleHeaderOptions(options, headerOptions),
535 headerOptions.size());
540 result = OC_STACK_ERROR;
546 OCStackApplicationResult deleteResourceCallback(void* ctx,
547 OCDoHandle /*handle*/,
548 OCClientResponse* clientResponse)
550 ClientCallbackContext::DeleteContext* context =
551 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
552 HeaderOptions serverHeaderOptions;
554 if(clientResponse->result == OC_STACK_OK)
556 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
558 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
560 return OC_STACK_DELETE_TRANSACTION;
563 OCStackResult InProcClientWrapper::DeleteResource(
564 const OCDevAddr& devAddr,
565 const std::string& uri,
566 const HeaderOptions& headerOptions,
567 DeleteCallback& callback,
568 QualityOfService /*QoS*/)
572 return OC_STACK_INVALID_PARAM;
574 OCStackResult result;
575 ClientCallbackContext::DeleteContext* ctx =
576 new ClientCallbackContext::DeleteContext(callback);
577 OCCallbackData cbdata(
578 static_cast<void*>(ctx),
579 deleteResourceCallback,
580 [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);}
583 auto cLock = m_csdkLock.lock();
587 OCHeaderOption options[MAX_HEADER_OPTIONS];
589 std::lock_guard<std::recursive_mutex> lock(*cLock);
591 result = OCDoResource(nullptr, OC_REST_DELETE,
592 uri.c_str(), &devAddr,
595 static_cast<OCQualityOfService>(m_cfg.QoS),
597 assembleHeaderOptions(options, headerOptions),
598 headerOptions.size());
603 result = OC_STACK_ERROR;
609 OCStackApplicationResult observeResourceCallback(void* ctx,
610 OCDoHandle /*handle*/,
611 OCClientResponse* clientResponse)
613 ClientCallbackContext::ObserveContext* context =
614 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
615 OCRepresentation attrs;
616 HeaderOptions serverHeaderOptions;
617 uint32_t sequenceNumber = clientResponse->sequenceNumber;
618 OCStackResult result = clientResponse->result;
619 if(clientResponse->result == OC_STACK_OK)
621 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
624 attrs = parseGetSetCallback(clientResponse);
626 catch(OC::OCException& e)
631 std::thread exec(context->callback, serverHeaderOptions, attrs,
632 result, sequenceNumber);
634 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
636 return OC_STACK_DELETE_TRANSACTION;
638 return OC_STACK_KEEP_TRANSACTION;
641 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
642 const OCDevAddr& devAddr,
643 const std::string& uri,
644 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
645 ObserveCallback& callback, QualityOfService QoS)
649 return OC_STACK_INVALID_PARAM;
651 OCStackResult result;
653 ClientCallbackContext::ObserveContext* ctx =
654 new ClientCallbackContext::ObserveContext(callback);
655 OCCallbackData cbdata(
656 static_cast<void*>(ctx),
657 observeResourceCallback,
658 [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);}
662 if (observeType == ObserveType::Observe)
664 method = OC_REST_OBSERVE;
666 else if (observeType == ObserveType::ObserveAll)
668 method = OC_REST_OBSERVE_ALL;
672 method = OC_REST_OBSERVE_ALL;
675 std::string url = assembleSetResourceUri(uri, queryParams).c_str();
677 auto cLock = m_csdkLock.lock();
681 std::lock_guard<std::recursive_mutex> lock(*cLock);
682 OCHeaderOption options[MAX_HEADER_OPTIONS];
684 result = OCDoResource(handle, method,
685 url.c_str(), &devAddr,
688 static_cast<OCQualityOfService>(QoS),
690 assembleHeaderOptions(options, headerOptions),
691 headerOptions.size());
696 return OC_STACK_ERROR;
702 OCStackResult InProcClientWrapper::CancelObserveResource(
704 const std::string& /*host*/,
705 const std::string& /*uri*/,
706 const HeaderOptions& headerOptions,
707 QualityOfService QoS)
709 OCStackResult result;
710 auto cLock = m_csdkLock.lock();
714 std::lock_guard<std::recursive_mutex> lock(*cLock);
715 OCHeaderOption options[MAX_HEADER_OPTIONS];
717 result = OCCancel(handle,
718 static_cast<OCQualityOfService>(QoS),
719 assembleHeaderOptions(options, headerOptions),
720 headerOptions.size());
724 result = OC_STACK_ERROR;
730 OCStackApplicationResult subscribePresenceCallback(void* ctx,
731 OCDoHandle /*handle*/,
732 OCClientResponse* clientResponse)
734 ClientCallbackContext::SubscribePresenceContext* context =
735 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
738 * This a hack while we rethink presence subscription.
740 std::string url = clientResponse->devAddr.addr;
742 std::thread exec(context->callback, clientResponse->result,
743 clientResponse->sequenceNumber, url);
747 return OC_STACK_KEEP_TRANSACTION;
750 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
751 const std::string& host, const std::string& resourceType,
752 OCConnectivityType connectivityType, SubscribeCallback& presenceHandler)
756 return OC_STACK_INVALID_PARAM;
759 ClientCallbackContext::SubscribePresenceContext* ctx =
760 new ClientCallbackContext::SubscribePresenceContext(presenceHandler);
761 OCCallbackData cbdata(
762 static_cast<void*>(ctx),
763 subscribePresenceCallback,
765 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);}
768 auto cLock = m_csdkLock.lock();
770 std::ostringstream os;
771 os << host << OC_RSRVD_PRESENCE_URI;
773 if(!resourceType.empty())
775 os << "?rt=" << resourceType;
781 return OC_STACK_ERROR;
784 return OCDoResource(handle, OC_REST_PRESENCE,
785 os.str().c_str(), nullptr,
786 nullptr, connectivityType,
787 OC_LOW_QOS, &cbdata, NULL, 0);
790 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
792 OCStackResult result;
793 auto cLock = m_csdkLock.lock();
797 std::lock_guard<std::recursive_mutex> lock(*cLock);
798 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
802 result = OC_STACK_ERROR;
808 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
814 OCHeaderOption* InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
815 const HeaderOptions& headerOptions)
819 if( headerOptions.size() == 0)
824 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
826 options[i] = OCHeaderOption(OC_COAP_ID,
828 it->getOptionData().length() + 1,
829 reinterpret_cast<const uint8_t*>(it->getOptionData().c_str()));