1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
61 // only stop if we are the ones who actually called 'init'. We are counting
62 // on the server to do the stop.
63 if(m_cfg.mode == ModeType::Client)
69 void InProcClientWrapper::listeningFunc()
74 auto cLock = m_csdkLock.lock();
77 std::lock_guard<std::recursive_mutex> lock(*cLock);
82 result = OC_STACK_ERROR;
85 if(result != OC_STACK_OK)
87 // TODO: do something with result if failed?
90 // To minimize CPU utilization we may wish to do this with sleep
91 std::this_thread::sleep_for(std::chrono::milliseconds(10));
95 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
97 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
99 return OCRepresentation();
103 oc.setJSONRepresentation(clientResponse->resJSONPayload);
105 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
106 if(it == oc.representations().end())
108 return OCRepresentation();
111 // first one is considered the root, everything else is considered a child of this one.
112 OCRepresentation root = *it;
115 std::for_each(it, oc.representations().end(),
116 [&root](const OCRepresentation& repItr)
117 {root.addChild(repItr);});
122 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
123 OCClientResponse* clientResponse)
125 ClientCallbackContext::ListenContext* context =
126 static_cast<ClientCallbackContext::ListenContext*>(ctx);
128 if(clientResponse->result != OC_STACK_OK)
130 oclog() << "listenCallback(): failed to create resource. clientResponse: "
131 << clientResponse->result
134 return OC_STACK_KEEP_TRANSACTION;
137 auto clientWrapper = context->clientWrapper.lock();
141 oclog() << "listenCallback(): failed to get a shared_ptr to the client wrapper"
143 return OC_STACK_KEEP_TRANSACTION;
146 std::stringstream requestStream;
147 requestStream << clientResponse->resJSONPayload;
151 ListenOCContainer container(clientWrapper, *clientResponse->addr,
154 // loop to ensure valid construction of all resources
155 for(auto resource : container.Resources())
157 std::thread exec(context->callback, resource);
162 catch(const std::exception& e)
164 oclog() << "listenCallback failed to parse a malformed message: "
166 << std::endl <<std::endl
167 << clientResponse->result
169 return OC_STACK_KEEP_TRANSACTION;
172 return OC_STACK_KEEP_TRANSACTION;
175 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
176 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
178 OCStackResult result;
180 OCCallbackData cbdata = {0};
182 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
183 context->callback = callback;
184 context->clientWrapper = shared_from_this();
186 cbdata.context = static_cast<void*>(context);
187 cbdata.cb = listenCallback;
188 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
190 auto cLock = m_csdkLock.lock();
193 std::lock_guard<std::recursive_mutex> lock(*cLock);
195 result = OCDoResource(&handle, OC_REST_GET,
196 resourceType.c_str(),
198 static_cast<OCQualityOfService>(QoS),
205 result = OC_STACK_ERROR;
210 OCStackApplicationResult listenDeviceCallback(void* ctx, OCDoHandle handle,
211 OCClientResponse* clientResponse)
213 ClientCallbackContext::DeviceListenContext* context =
214 static_cast<ClientCallbackContext::DeviceListenContext*>(ctx);
216 OCRepresentation rep = parseGetSetCallback(clientResponse);
217 std::thread exec(context->callback, rep);
220 return OC_STACK_KEEP_TRANSACTION;
223 OCStackResult InProcClientWrapper::ListenForDevice(const std::string& serviceUrl,
224 const std::string& deviceURI, FindDeviceCallback& callback, QualityOfService QoS)
226 OCStackResult result;
228 OCCallbackData cbdata = {0};
230 ClientCallbackContext::DeviceListenContext* context =
231 new ClientCallbackContext::DeviceListenContext();
232 context->callback = callback;
233 context->clientWrapper = shared_from_this();
235 cbdata.context = static_cast<void*>(context);
236 cbdata.cb = listenDeviceCallback;
237 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeviceListenContext*>(c);};
239 auto cLock = m_csdkLock.lock();
242 std::lock_guard<std::recursive_mutex> lock(*cLock);
244 result = OCDoResource(&handle, OC_REST_GET,
247 static_cast<OCQualityOfService>(QoS),
253 result = OC_STACK_ERROR;
258 void parseServerHeaderOptions(OCClientResponse* clientResponse,
259 HeaderOptions& serverHeaderOptions)
263 // Parse header options from server
265 std::string optionData;
267 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
269 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
270 optionData = reinterpret_cast<const char*>
271 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
272 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
273 serverHeaderOptions.push_back(headerOption);
278 // clientResponse is invalid
279 // TODO check proper logging
280 std::cout << " Invalid response " << std::endl;
284 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
285 OCClientResponse* clientResponse)
287 ClientCallbackContext::GetContext* context =
288 static_cast<ClientCallbackContext::GetContext*>(ctx);
290 OCRepresentation rep;
291 HeaderOptions serverHeaderOptions;
292 if(clientResponse->result == OC_STACK_OK)
294 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
295 rep = parseGetSetCallback(clientResponse);
298 std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
300 return OC_STACK_DELETE_TRANSACTION;
303 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
304 const std::string& uri, const QueryParamsMap& queryParams,
305 const HeaderOptions& headerOptions, GetCallback& callback,
306 QualityOfService QoS)
308 OCStackResult result;
309 OCCallbackData cbdata = {0};
311 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
312 ctx->callback = callback;
313 cbdata.context = static_cast<void*>(ctx);
314 cbdata.cb = &getResourceCallback;
315 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
317 auto cLock = m_csdkLock.lock();
321 std::ostringstream os;
322 os << host << assembleSetResourceUri(uri, queryParams).c_str();
324 std::lock_guard<std::recursive_mutex> lock(*cLock);
326 OCHeaderOption options[MAX_HEADER_OPTIONS];
328 assembleHeaderOptions(options, headerOptions);
329 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
331 static_cast<OCQualityOfService>(QoS),
333 options, headerOptions.size());
338 result = OC_STACK_ERROR;
344 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
345 OCClientResponse* clientResponse)
347 ClientCallbackContext::SetContext* context =
348 static_cast<ClientCallbackContext::SetContext*>(ctx);
349 OCRepresentation attrs;
350 HeaderOptions serverHeaderOptions;
352 if (OC_STACK_OK == clientResponse->result ||
353 OC_STACK_RESOURCE_CREATED == clientResponse->result ||
354 OC_STACK_RESOURCE_DELETED == clientResponse->result)
356 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
357 attrs = parseGetSetCallback(clientResponse);
360 std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
362 return OC_STACK_DELETE_TRANSACTION;
365 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
366 const QueryParamsMap& queryParams)
368 if(uri.back() == '/')
370 uri.resize(uri.size()-1);
373 ostringstream paramsList;
374 if(queryParams.size() > 0)
379 for(auto& param : queryParams)
381 paramsList << param.first <<'='<<param.second<<'&';
384 std::string queryString = paramsList.str();
385 if(queryString.back() == '&')
387 queryString.resize(queryString.size() - 1);
390 std::string ret = uri + queryString;
394 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
396 MessageContainer ocInfo;
397 ocInfo.addRepresentation(rep);
398 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
401 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
402 const std::string& uri, const OCRepresentation& rep,
403 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
404 PostCallback& callback, QualityOfService QoS)
406 OCStackResult result;
407 OCCallbackData cbdata = {0};
409 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
410 ctx->callback = callback;
411 cbdata.cb = &setResourceCallback;
412 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
413 cbdata.context = static_cast<void*>(ctx);
415 // TODO: in the future the cstack should be combining these two strings!
417 os << host << assembleSetResourceUri(uri, queryParams).c_str();
418 // TODO: end of above
420 auto cLock = m_csdkLock.lock();
424 std::lock_guard<std::recursive_mutex> lock(*cLock);
425 OCHeaderOption options[MAX_HEADER_OPTIONS];
428 assembleHeaderOptions(options, headerOptions);
429 result = OCDoResource(&handle, OC_REST_POST,
430 os.str().c_str(), nullptr,
431 assembleSetResourcePayload(rep).c_str(),
432 static_cast<OCQualityOfService>(QoS),
433 &cbdata, options, headerOptions.size());
438 result = OC_STACK_ERROR;
445 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
446 const std::string& uri, const OCRepresentation& rep,
447 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
448 PutCallback& callback, QualityOfService QoS)
450 OCStackResult result;
451 OCCallbackData cbdata = {0};
453 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
454 ctx->callback = callback;
455 cbdata.cb = &setResourceCallback;
456 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
457 cbdata.context = static_cast<void*>(ctx);
459 // TODO: in the future the cstack should be combining these two strings!
461 os << host << assembleSetResourceUri(uri, queryParams).c_str();
462 // TODO: end of above
464 auto cLock = m_csdkLock.lock();
468 std::lock_guard<std::recursive_mutex> lock(*cLock);
470 OCHeaderOption options[MAX_HEADER_OPTIONS];
472 assembleHeaderOptions(options, headerOptions);
473 result = OCDoResource(&handle, OC_REST_PUT,
474 os.str().c_str(), nullptr,
475 assembleSetResourcePayload(rep).c_str(),
476 static_cast<OCQualityOfService>(QoS),
478 options, headerOptions.size());
483 result = OC_STACK_ERROR;
489 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
490 OCClientResponse* clientResponse)
492 ClientCallbackContext::DeleteContext* context =
493 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
494 HeaderOptions serverHeaderOptions;
496 if(clientResponse->result == OC_STACK_OK)
498 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
500 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
502 return OC_STACK_DELETE_TRANSACTION;
505 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
506 const std::string& uri, const HeaderOptions& headerOptions,
507 DeleteCallback& callback, QualityOfService QoS)
509 OCStackResult result;
510 OCCallbackData cbdata = {0};
512 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
513 ctx->callback = callback;
514 cbdata.cb = &deleteResourceCallback;
515 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
516 cbdata.context = static_cast<void*>(ctx);
521 auto cLock = m_csdkLock.lock();
525 OCHeaderOption options[MAX_HEADER_OPTIONS];
528 assembleHeaderOptions(options, headerOptions);
530 std::lock_guard<std::recursive_mutex> lock(*cLock);
532 result = OCDoResource(&handle, OC_REST_DELETE,
533 os.str().c_str(), nullptr,
534 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
535 &cbdata, options, headerOptions.size());
540 result = OC_STACK_ERROR;
546 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
547 OCClientResponse* clientResponse)
549 ClientCallbackContext::ObserveContext* context =
550 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
551 OCRepresentation attrs;
552 HeaderOptions serverHeaderOptions;
553 uint32_t sequenceNumber = clientResponse->sequenceNumber;
554 if(clientResponse->result == OC_STACK_OK)
556 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
557 attrs = parseGetSetCallback(clientResponse);
559 std::thread exec(context->callback, serverHeaderOptions, attrs,
560 clientResponse->result, sequenceNumber);
562 if(sequenceNumber == OC_OBSERVE_DEREGISTER)
564 return OC_STACK_DELETE_TRANSACTION;
566 return OC_STACK_KEEP_TRANSACTION;
569 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
570 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
571 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
573 OCStackResult result;
574 OCCallbackData cbdata = {0};
576 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
577 ctx->callback = callback;
578 cbdata.context = static_cast<void*>(ctx);
579 cbdata.cb = &observeResourceCallback;
580 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
583 if (observeType == ObserveType::Observe)
585 method = OC_REST_OBSERVE;
587 else if (observeType == ObserveType::ObserveAll)
589 method = OC_REST_OBSERVE_ALL;
593 method = OC_REST_OBSERVE_ALL;
596 auto cLock = m_csdkLock.lock();
600 std::ostringstream os;
601 os << host << assembleSetResourceUri(uri, queryParams).c_str();
603 std::lock_guard<std::recursive_mutex> lock(*cLock);
604 OCHeaderOption options[MAX_HEADER_OPTIONS];
606 assembleHeaderOptions(options, headerOptions);
607 result = OCDoResource(handle, method,
608 os.str().c_str(), nullptr,
610 static_cast<OCQualityOfService>(QoS),
612 options, headerOptions.size());
617 return OC_STACK_ERROR;
623 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
624 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
625 QualityOfService QoS)
627 OCStackResult result;
628 auto cLock = m_csdkLock.lock();
632 std::lock_guard<std::recursive_mutex> lock(*cLock);
633 OCHeaderOption options[MAX_HEADER_OPTIONS];
635 assembleHeaderOptions(options, headerOptions);
636 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
637 headerOptions.size());
641 result = OC_STACK_ERROR;
647 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
648 OCClientResponse* clientResponse)
650 char stringAddress[DEV_ADDR_SIZE_MAX];
654 if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
655 OCDevAddrToPort(clientResponse->addr, &port) == 0)
657 os<<stringAddress<<":"<<port;
659 ClientCallbackContext::SubscribePresenceContext* context =
660 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
662 std::thread exec(context->callback, clientResponse->result,
663 clientResponse->sequenceNumber, os.str());
669 oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
670 <<"failed"<< std::flush;
672 return OC_STACK_KEEP_TRANSACTION;
675 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
676 const std::string& host, const std::string& resourceType,
677 SubscribeCallback& presenceHandler)
679 OCCallbackData cbdata = {0};
681 ClientCallbackContext::SubscribePresenceContext* ctx =
682 new ClientCallbackContext::SubscribePresenceContext();
683 ctx->callback = presenceHandler;
684 cbdata.cb = &subscribePresenceCallback;
685 cbdata.context = static_cast<void*>(ctx);
686 cbdata.cd = [](void* c)
687 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
688 auto cLock = m_csdkLock.lock();
690 std::ostringstream os;
691 os << host << "/oc/presence";
693 if(!resourceType.empty())
695 os << "?rt=" << resourceType;
701 return OC_STACK_ERROR;
704 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
705 OC_LOW_QOS, &cbdata, NULL, 0);
708 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
710 OCStackResult result;
711 auto cLock = m_csdkLock.lock();
715 std::lock_guard<std::recursive_mutex> lock(*cLock);
716 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
720 result = OC_STACK_ERROR;
726 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
732 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
733 const HeaderOptions& headerOptions)
737 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
739 options[i].protocolID = OC_COAP_ID;
740 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
741 options[i].optionLength = (it->getOptionData()).length() + 1;
742 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
743 (it->getOptionData()).length() + 1);