return OC_STACK_KEEP_TRANSACTION;
}
- try{
+ try
+ {
ListenOCContainer container(clientWrapper, clientResponse->devAddr,
reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
// loop to ensure valid construction of all resources
+
for(auto resource : container.Resources())
{
std::thread exec(context->callback, resource);
exec.detach();
}
}
- catch (std::exception &e){
+ catch (std::exception &e)
+ {
oclog() << "Exception in listCallback, ignoring response: "
<< e.what() << std::flush;
}
std::string resourceURI = clientResponse->resourceUri;
std::thread exec(context->errorCallback, resourceURI, result);
exec.detach();
- return OC_STACK_DELETE_TRANSACTION;
+ return OC_STACK_KEEP_TRANSACTION;
}
OCStackResult InProcClientWrapper::ListenForResource(
}
return result;
}
+#ifdef WITH_MQ
+ OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::MQTopicContext* context =
+ static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
+
+ if (!clientResponse || !context)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string resourceURI = clientResponse->resourceUri;
+ if (clientResponse->result != OC_STACK_OK)
+ {
+ oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
+ << clientResponse->result
+ << std::flush;
+
+ std::thread exec(context->callback, clientResponse->result,
+ resourceURI, nullptr);
+ exec.detach();
+
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+ if (!clientWrapper)
+ {
+ oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ (OCRepPayload *) clientResponse->payload);
+
+ // loop to ensure valid construction of all resources
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, clientResponse->result,
+ resourceURI, resource);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e)
+ {
+ oclog() << "Exception in listCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams,
+ const HeaderOptions& headerOptions,
+ MQTopicCallback& callback,
+ QualityOfService QoS)
+ {
+ oclog() << "ListenForMQTopic()" << std::flush;
+
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ ClientCallbackContext::MQTopicContext* context =
+ new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(context),
+ cbdata.cb = listenMQCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
+
+ std::string uri = assembleSetResourceUri(resourceUri, queryParams);
+
+ OCStackResult result = OC_STACK_ERROR;
+ auto cLock = m_csdkLock.lock();
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+ result = OCDoResource(
+ nullptr, OC_REST_GET,
+ uri.c_str(),
+ &devAddr, nullptr,
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete context;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult listenDeviceCallback(void* ctx,
OCDoHandle /*handle*/,
}
}
+#ifdef WITH_MQ
+ OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::MQTopicContext* context =
+ static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
+ HeaderOptions serverHeaderOptions;
+
+ if (!clientResponse || !context)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string createdUri;
+ bool isLocationOption = false;
+ OCStackResult result = clientResponse->result;
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+
+ for (auto headerOption : serverHeaderOptions)
+ {
+ if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+ {
+ createdUri += "/";
+ createdUri += headerOption.getOptionData();
+ if (!isLocationOption)
+ {
+ isLocationOption = true;
+ }
+ }
+ }
+ }
+
+ if (!isLocationOption)
+ {
+ createdUri = std::string(clientResponse->resourceUri);
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+
+ if (!clientWrapper)
+ {
+ oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try
+ {
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ createdUri);
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, result,
+ createdUri,
+ resource);
+ exec.detach();
+ }
+ }
+ else
+ {
+ std::thread exec(context->callback, result,
+ createdUri,
+ nullptr);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e)
+ {
+ oclog() << "Exception in createMQTopicCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQTopicCallback& callback, QualityOfService QoS)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+ ClientCallbackContext::MQTopicContext* ctx =
+ new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = createMQTopicCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
+
+ std::string url = assembleSetResourceUri(uri, queryParams);
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+ result = OCDoResource(nullptr, OC_REST_PUT,
+ url.c_str(), &devAddr,
+ assembleSetResourcePayload(rep),
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult getResourceCallback(void* ctx,
OCDoHandle /*handle*/,
OCClientResponse* clientResponse)
const OCDevAddr& devAddr,
const std::string& resourceUri,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ OCConnectivityType connectivityType,
GetCallback& callback, QualityOfService QoS)
{
if (!callback)
nullptr, OC_REST_GET,
uri.c_str(),
&devAddr, nullptr,
- CT_DEFAULT,
+ connectivityType,
static_cast<OCQualityOfService>(QoS),
&cbdata,
assembleHeaderOptions(options, headerOptions),
OCStackResult result = clientResponse->result;
if (OC_STACK_OK == result ||
OC_STACK_RESOURCE_CREATED == result ||
- OC_STACK_RESOURCE_DELETED == result)
+ OC_STACK_RESOURCE_DELETED == result ||
+ OC_STACK_RESOURCE_CHANGED == result)
{
parseServerHeaderOptions(clientResponse, serverHeaderOptions);
try
return ret;
}
+ std::string InProcClientWrapper::assembleSetResourceUri(std::string uri,
+ const QueryParamsList& queryParams)
+ {
+ if (!uri.empty())
+ {
+ if (uri.back() == '/')
+ {
+ uri.resize(uri.size() - 1);
+ }
+ }
+
+ ostringstream paramsList;
+ if (queryParams.size() > 0)
+ {
+ paramsList << '?';
+ }
+
+ for (auto& param : queryParams)
+ {
+ for (auto& paramList : param.second)
+ {
+ paramsList << param.first << '=' << paramList << ';';
+ }
+ }
+
+ std::string queryString = paramsList.str();
+
+ if (queryString.empty())
+ {
+ return uri;
+ }
+
+ if (queryString.back() == ';')
+ {
+ queryString.resize(queryString.size() - 1);
+ }
+
+ std::string ret = uri + queryString;
+ return ret;
+ }
+
OCPayload* InProcClientWrapper::assembleSetResourcePayload(const OCRepresentation& rep)
{
MessageContainer ocInfo;
const std::string& uri,
const OCRepresentation& rep,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ OCConnectivityType connectivityType,
PostCallback& callback, QualityOfService QoS)
{
if (!callback)
result = OCDoResource(nullptr, OC_REST_POST,
url.c_str(), &devAddr,
assembleSetResourcePayload(rep),
- CT_DEFAULT,
+ connectivityType,
static_cast<OCQualityOfService>(QoS),
&cbdata,
assembleHeaderOptions(options, headerOptions),
const OCDevAddr& devAddr,
const std::string& uri,
const HeaderOptions& headerOptions,
+ OCConnectivityType connectivityType,
DeleteCallback& callback,
QualityOfService /*QoS*/)
{
result = OCDoResource(nullptr, OC_REST_DELETE,
uri.c_str(), &devAddr,
nullptr,
- CT_DEFAULT,
+ connectivityType,
static_cast<OCQualityOfService>(m_cfg.QoS),
&cbdata,
assembleHeaderOptions(options, headerOptions),
std::thread exec(context->callback, serverHeaderOptions, attrs,
result, sequenceNumber);
exec.detach();
- if (sequenceNumber == OC_OBSERVE_DEREGISTER)
+ if (sequenceNumber == MAX_SEQUENCE_NUMBER + 1)
{
return OC_STACK_DELETE_TRANSACTION;
}
+
return OC_STACK_KEEP_TRANSACTION;
}
return result;
}
+#ifdef WITH_CLOUD
+ OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
+ const std::string& host,
+ const std::vector<std::string>& di,
+ OCConnectivityType connectivityType,
+ ObserveCallback& callback)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+
+ ClientCallbackContext::ObserveContext* ctx =
+ new ClientCallbackContext::ObserveContext(callback);
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = observeResourceCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::ObserveContext*)c;};
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+
+ std::ostringstream os;
+ os << host << OC_RSRVD_DEVICE_PRESENCE_URI;
+ QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
+ std::string url = assembleSetResourceUri(os.str(), queryParams);
+
+ result = OCDoResource(handle, OC_REST_OBSERVE,
+ url.c_str(), nullptr,
+ nullptr, connectivityType,
+ OC_LOW_QOS, &cbdata,
+ nullptr, 0);
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif
+
OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
{
qos = m_cfg.QoS;