1 //******************************************************************
3 // Copyright 2014 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "InProcClientWrapper.h"
24 #include "OCPlatform.h"
25 #include "OCResource.h"
26 #include <OCSerialization.h>
31 InProcClientWrapper::InProcClientWrapper(
32 std::weak_ptr<std::recursive_mutex> csdkLock, PlatformConfig cfg)
33 : m_threadRun(false), m_csdkLock(csdkLock),
36 // if the config type is server, we ought to never get called. If the config type
37 // is both, we count on the server to run the thread and do the initialize
39 if(m_cfg.mode == ModeType::Client)
41 OCStackResult result = OCInit(m_cfg.ipAddress.c_str(), m_cfg.port, OC_CLIENT);
43 if(OC_STACK_OK != result)
45 throw InitializeException(OC::InitException::STACK_INIT_ERROR, result);
49 m_listeningThread = std::thread(&InProcClientWrapper::listeningFunc, this);
53 InProcClientWrapper::~InProcClientWrapper()
55 if(m_threadRun && m_listeningThread.joinable())
58 m_listeningThread.join();
64 void InProcClientWrapper::listeningFunc()
69 auto cLock = m_csdkLock.lock();
72 std::lock_guard<std::recursive_mutex> lock(*cLock);
77 result = OC_STACK_ERROR;
80 if(result != OC_STACK_OK)
82 // TODO: do something with result if failed?
85 // To minimize CPU utilization we may wish to do this with sleep
86 std::this_thread::sleep_for(std::chrono::milliseconds(10));
90 OCStackApplicationResult listenCallback(void* ctx, OCDoHandle handle,
91 OCClientResponse* clientResponse)
93 ClientCallbackContext::ListenContext* context =
94 static_cast<ClientCallbackContext::ListenContext*>(ctx);
96 if(clientResponse->result != OC_STACK_OK)
98 oclog() << "listenCallback(): failed to create resource. clientResponse: "
99 << clientResponse->result
102 return OC_STACK_KEEP_TRANSACTION;
105 std::stringstream requestStream;
106 requestStream << clientResponse->resJSONPayload;
110 ListenOCContainer container(context->clientWrapper, *clientResponse->addr,
113 // loop to ensure valid construction of all resources
114 for(auto resource : container.Resources())
116 std::thread exec(context->callback, resource);
121 catch(const std::exception& e)
123 oclog() << "listenCallback failed to parse a malformed message: "
125 << std::endl <<std::endl
126 << clientResponse->result
128 return OC_STACK_KEEP_TRANSACTION;
131 return OC_STACK_KEEP_TRANSACTION;
135 OCStackResult InProcClientWrapper::ListenForResource(const std::string& serviceUrl,
136 const std::string& resourceType, FindCallback& callback, QualityOfService QoS)
138 OCStackResult result;
140 OCCallbackData cbdata = {0};
142 ClientCallbackContext::ListenContext* context = new ClientCallbackContext::ListenContext();
143 context->callback = callback;
144 context->clientWrapper = shared_from_this();
146 cbdata.context = static_cast<void*>(context);
147 cbdata.cb = listenCallback;
148 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ListenContext*>(c);};
150 auto cLock = m_csdkLock.lock();
153 std::lock_guard<std::recursive_mutex> lock(*cLock);
155 result = OCDoResource(&handle, OC_REST_GET,
156 resourceType.c_str(),
158 static_cast<OCQualityOfService>(QoS),
164 result = OC_STACK_ERROR;
169 OCRepresentation parseGetSetCallback(OCClientResponse* clientResponse)
171 if(clientResponse->resJSONPayload == nullptr || clientResponse->resJSONPayload[0] == '\0')
173 return OCRepresentation();
177 oc.setJSONRepresentation(clientResponse->resJSONPayload);
179 std::vector<OCRepresentation>::const_iterator it = oc.representations().begin();
180 if(it == oc.representations().end())
182 return OCRepresentation();
185 // first one is considered the root, everything else is considered a child of this one.
186 OCRepresentation root = *it;
189 std::for_each(it, oc.representations().end(),
190 [&root](const OCRepresentation& repItr)
191 {root.addChild(repItr);});
196 void parseServerHeaderOptions(OCClientResponse* clientResponse,
197 HeaderOptions& serverHeaderOptions)
201 // Parse header options from server
203 std::string optionData;
205 for(int i = 0; i < clientResponse->numRcvdVendorSpecificHeaderOptions; i++)
207 optionID = clientResponse->rcvdVendorSpecificHeaderOptions[i].optionID;
208 optionData = reinterpret_cast<const char*>
209 (clientResponse->rcvdVendorSpecificHeaderOptions[i].optionData);
210 HeaderOption::OCHeaderOption headerOption(optionID, optionData);
211 serverHeaderOptions.push_back(headerOption);
216 // clientResponse is invalid
217 // TODO check proper logging
218 std::cout << " Invalid response " << std::endl;
222 OCStackApplicationResult getResourceCallback(void* ctx, OCDoHandle handle,
223 OCClientResponse* clientResponse)
225 ClientCallbackContext::GetContext* context =
226 static_cast<ClientCallbackContext::GetContext*>(ctx);
228 OCRepresentation rep;
229 HeaderOptions serverHeaderOptions;
230 if(clientResponse->result == OC_STACK_OK)
232 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
233 rep = parseGetSetCallback(clientResponse);
236 std::thread exec(context->callback, serverHeaderOptions, rep, clientResponse->result);
238 return OC_STACK_DELETE_TRANSACTION;
241 OCStackResult InProcClientWrapper::GetResourceRepresentation(const std::string& host,
242 const std::string& uri, const QueryParamsMap& queryParams,
243 const HeaderOptions& headerOptions, GetCallback& callback,
244 QualityOfService QoS)
246 OCStackResult result;
247 OCCallbackData cbdata = {0};
249 ClientCallbackContext::GetContext* ctx = new ClientCallbackContext::GetContext();
250 ctx->callback = callback;
251 cbdata.context = static_cast<void*>(ctx);
252 cbdata.cb = &getResourceCallback;
253 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::GetContext*>(c);};
255 auto cLock = m_csdkLock.lock();
259 std::ostringstream os;
260 os << host << assembleSetResourceUri(uri, queryParams).c_str();
262 std::lock_guard<std::recursive_mutex> lock(*cLock);
264 OCHeaderOption options[MAX_HEADER_OPTIONS];
266 assembleHeaderOptions(options, headerOptions);
267 result = OCDoResource(&handle, OC_REST_GET, os.str().c_str(),
269 static_cast<OCQualityOfService>(QoS),
271 options, headerOptions.size());
275 result = OC_STACK_ERROR;
281 OCStackApplicationResult setResourceCallback(void* ctx, OCDoHandle handle,
282 OCClientResponse* clientResponse)
284 ClientCallbackContext::SetContext* context =
285 static_cast<ClientCallbackContext::SetContext*>(ctx);
286 OCRepresentation attrs;
287 HeaderOptions serverHeaderOptions;
289 if (OC_STACK_OK == clientResponse->result ||
290 OC_STACK_RESOURCE_CREATED == clientResponse->result ||
291 OC_STACK_RESOURCE_DELETED == clientResponse->result)
293 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
294 attrs = parseGetSetCallback(clientResponse);
297 std::thread exec(context->callback, serverHeaderOptions, attrs, clientResponse->result);
299 return OC_STACK_DELETE_TRANSACTION;
302 std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
303 const QueryParamsMap& queryParams)
305 if(uri.back() == '/')
307 uri.resize(uri.size()-1);
310 ostringstream paramsList;
311 if(queryParams.size() > 0)
316 for(auto& param : queryParams)
318 paramsList << param.first <<'='<<param.second<<'&';
321 std::string queryString = paramsList.str();
322 if(queryString.back() == '&')
324 queryString.resize(queryString.size() - 1);
327 std::string ret = uri + queryString;
331 std::string InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
333 MessageContainer ocInfo;
334 ocInfo.addRepresentation(rep);
335 return ocInfo.getJSONRepresentation(OCInfoFormat::IncludeOC);
338 OCStackResult InProcClientWrapper::PostResourceRepresentation(const std::string& host,
339 const std::string& uri, const OCRepresentation& rep,
340 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
341 PostCallback& callback, QualityOfService QoS)
343 OCStackResult result;
344 OCCallbackData cbdata = {0};
346 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
347 ctx->callback = callback;
348 cbdata.cb = &setResourceCallback;
349 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
350 cbdata.context = static_cast<void*>(ctx);
352 // TODO: in the future the cstack should be combining these two strings!
354 os << host << assembleSetResourceUri(uri, queryParams).c_str();
355 // TODO: end of above
357 auto cLock = m_csdkLock.lock();
361 std::lock_guard<std::recursive_mutex> lock(*cLock);
362 OCHeaderOption options[MAX_HEADER_OPTIONS];
365 assembleHeaderOptions(options, headerOptions);
366 result = OCDoResource(&handle, OC_REST_POST,
367 os.str().c_str(), nullptr,
368 assembleSetResourcePayload(rep).c_str(),
369 static_cast<OCQualityOfService>(QoS),
370 &cbdata, options, headerOptions.size());
374 result = OC_STACK_ERROR;
381 OCStackResult InProcClientWrapper::PutResourceRepresentation(const std::string& host,
382 const std::string& uri, const OCRepresentation& rep,
383 const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
384 PutCallback& callback, QualityOfService QoS)
386 OCStackResult result;
387 OCCallbackData cbdata = {0};
389 ClientCallbackContext::SetContext* ctx = new ClientCallbackContext::SetContext();
390 ctx->callback = callback;
391 cbdata.cb = &setResourceCallback;
392 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::SetContext*>(c);};
393 cbdata.context = static_cast<void*>(ctx);
395 // TODO: in the future the cstack should be combining these two strings!
397 os << host << assembleSetResourceUri(uri, queryParams).c_str();
398 // TODO: end of above
400 auto cLock = m_csdkLock.lock();
404 std::lock_guard<std::recursive_mutex> lock(*cLock);
406 OCHeaderOption options[MAX_HEADER_OPTIONS];
408 assembleHeaderOptions(options, headerOptions);
409 result = OCDoResource(&handle, OC_REST_PUT,
410 os.str().c_str(), nullptr,
411 assembleSetResourcePayload(rep).c_str(),
412 static_cast<OCQualityOfService>(QoS),
414 options, headerOptions.size());
418 result = OC_STACK_ERROR;
424 OCStackApplicationResult deleteResourceCallback(void* ctx, OCDoHandle handle,
425 OCClientResponse* clientResponse)
427 ClientCallbackContext::DeleteContext* context =
428 static_cast<ClientCallbackContext::DeleteContext*>(ctx);
429 HeaderOptions serverHeaderOptions;
431 if(clientResponse->result == OC_STACK_OK)
433 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
435 std::thread exec(context->callback, serverHeaderOptions, clientResponse->result);
437 return OC_STACK_DELETE_TRANSACTION;
440 OCStackResult InProcClientWrapper::DeleteResource(const std::string& host,
441 const std::string& uri, const HeaderOptions& headerOptions,
442 DeleteCallback& callback, QualityOfService QoS)
444 OCStackResult result;
445 OCCallbackData cbdata = {0};
447 ClientCallbackContext::DeleteContext* ctx = new ClientCallbackContext::DeleteContext();
448 ctx->callback = callback;
449 cbdata.cb = &deleteResourceCallback;
450 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::DeleteContext*>(c);};
451 cbdata.context = static_cast<void*>(ctx);
456 auto cLock = m_csdkLock.lock();
460 OCHeaderOption options[MAX_HEADER_OPTIONS];
463 assembleHeaderOptions(options, headerOptions);
465 std::lock_guard<std::recursive_mutex> lock(*cLock);
467 result = OCDoResource(&handle, OC_REST_DELETE,
468 os.str().c_str(), nullptr,
469 nullptr, static_cast<OCQualityOfService>(m_cfg.QoS),
470 &cbdata, options, headerOptions.size());
474 result = OC_STACK_ERROR;
480 OCStackApplicationResult observeResourceCallback(void* ctx, OCDoHandle handle,
481 OCClientResponse* clientResponse)
483 ClientCallbackContext::ObserveContext* context =
484 static_cast<ClientCallbackContext::ObserveContext*>(ctx);
485 OCRepresentation attrs;
486 HeaderOptions serverHeaderOptions;
487 uint32_t sequenceNumber = clientResponse->sequenceNumber;
489 if(clientResponse->result == OC_STACK_OK)
491 parseServerHeaderOptions(clientResponse, serverHeaderOptions);
492 attrs = parseGetSetCallback(clientResponse);
494 std::thread exec(context->callback, serverHeaderOptions, attrs,
495 clientResponse->result, sequenceNumber);
497 return OC_STACK_KEEP_TRANSACTION;
500 OCStackResult InProcClientWrapper::ObserveResource(ObserveType observeType, OCDoHandle* handle,
501 const std::string& host, const std::string& uri, const QueryParamsMap& queryParams,
502 const HeaderOptions& headerOptions, ObserveCallback& callback, QualityOfService QoS)
504 OCStackResult result;
505 OCCallbackData cbdata = {0};
507 ClientCallbackContext::ObserveContext* ctx = new ClientCallbackContext::ObserveContext();
508 ctx->callback = callback;
509 cbdata.context = static_cast<void*>(ctx);
510 cbdata.cb = &observeResourceCallback;
511 cbdata.cd = [](void* c){delete static_cast<ClientCallbackContext::ObserveContext*>(c);};
514 if (observeType == ObserveType::Observe)
516 method = OC_REST_OBSERVE;
518 else if (observeType == ObserveType::ObserveAll)
520 method = OC_REST_OBSERVE_ALL;
524 method = OC_REST_OBSERVE_ALL;
527 auto cLock = m_csdkLock.lock();
531 std::ostringstream os;
532 os << host << assembleSetResourceUri(uri, queryParams).c_str();
534 std::lock_guard<std::recursive_mutex> lock(*cLock);
535 OCHeaderOption options[MAX_HEADER_OPTIONS];
537 assembleHeaderOptions(options, headerOptions);
538 result = OCDoResource(handle, method,
539 os.str().c_str(), nullptr,
541 static_cast<OCQualityOfService>(QoS),
543 options, headerOptions.size());
547 return OC_STACK_ERROR;
553 OCStackResult InProcClientWrapper::CancelObserveResource(OCDoHandle handle,
554 const std::string& host, const std::string& uri, const HeaderOptions& headerOptions,
555 QualityOfService QoS)
557 OCStackResult result;
558 auto cLock = m_csdkLock.lock();
562 std::lock_guard<std::recursive_mutex> lock(*cLock);
563 OCHeaderOption options[MAX_HEADER_OPTIONS];
565 assembleHeaderOptions(options, headerOptions);
566 result = OCCancel(handle, static_cast<OCQualityOfService>(QoS), options,
567 headerOptions.size());
571 result = OC_STACK_ERROR;
577 OCStackApplicationResult subscribePresenceCallback(void* ctx, OCDoHandle handle,
578 OCClientResponse* clientResponse)
580 char stringAddress[DEV_ADDR_SIZE_MAX];
584 if(OCDevAddrToString(clientResponse->addr, stringAddress) == 0 &&
585 OCDevAddrToPort(clientResponse->addr, &port) == 0)
587 os<<stringAddress<<":"<<port;
589 ClientCallbackContext::SubscribePresenceContext* context =
590 static_cast<ClientCallbackContext::SubscribePresenceContext*>(ctx);
592 std::thread exec(context->callback, clientResponse->result,
593 clientResponse->sequenceNumber, os.str());
599 oclog() << "subscribePresenceCallback(): OCDevAddrToString() or OCDevAddrToPort() "
600 <<"failed"<< std::flush;
602 return OC_STACK_KEEP_TRANSACTION;
605 OCStackResult InProcClientWrapper::SubscribePresence(OCDoHandle* handle,
606 const std::string& host, const std::string& resourceType,
607 SubscribeCallback& presenceHandler)
609 OCCallbackData cbdata = {0};
611 ClientCallbackContext::SubscribePresenceContext* ctx =
612 new ClientCallbackContext::SubscribePresenceContext();
613 ctx->callback = presenceHandler;
614 cbdata.cb = &subscribePresenceCallback;
615 cbdata.context = static_cast<void*>(ctx);
616 cbdata.cd = [](void* c)
617 {delete static_cast<ClientCallbackContext::SubscribePresenceContext*>(c);};
618 auto cLock = m_csdkLock.lock();
620 std::ostringstream os;
621 os << host << "/oc/presence";
623 if(!resourceType.empty())
625 os << "?rt=" << resourceType;
629 return OC_STACK_ERROR;
631 return OCDoResource(handle, OC_REST_PRESENCE, os.str().c_str(), nullptr, nullptr,
632 OC_LOW_QOS, &cbdata, NULL, 0);
635 OCStackResult InProcClientWrapper::UnsubscribePresence(OCDoHandle handle)
637 OCStackResult result;
638 auto cLock = m_csdkLock.lock();
642 std::lock_guard<std::recursive_mutex> lock(*cLock);
643 result = OCCancel(handle, OC_LOW_QOS, NULL, 0);
647 result = OC_STACK_ERROR;
653 OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
659 void InProcClientWrapper::assembleHeaderOptions(OCHeaderOption options[],
660 const HeaderOptions& headerOptions)
664 for (auto it=headerOptions.begin(); it != headerOptions.end(); ++it)
666 options[i].protocolID = OC_COAP_ID;
667 options[i].optionID = static_cast<uint16_t>(it->getOptionID());
668 options[i].optionLength = (it->getOptionData()).length() + 1;
669 memcpy(options[i].optionData, (it->getOptionData()).c_str(),
670 (it->getOptionData()).length() + 1);