ListenOCContainer container(clientWrapper, clientResponse->devAddr,
reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
// loop to ensure valid construction of all resources
+
for(auto resource : container.Resources())
{
std::thread exec(context->callback, resource);
}
return result;
}
+#ifdef WITH_MQ
+ OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::ListenContext* context =
+ static_cast<ClientCallbackContext::ListenContext*>(ctx);
+
+ if (!clientResponse)
+ {
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ if (clientResponse->result != OC_STACK_OK)
+ {
+ oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
+ << clientResponse->result
+ << std::flush;
+
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+ if (!clientWrapper)
+ {
+ oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ try{
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ (OCRepPayload *) clientResponse->payload);
+
+ // loop to ensure valid construction of all resources
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, resource);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e){
+ oclog() << "Exception in listCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+
+
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::ListenForMQTopic(
+ const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ FindCallback& callback, QualityOfService QoS)
+ {
+ oclog() << "ListenForMQTopic()" << std::flush;
+
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ ClientCallbackContext::ListenContext* context =
+ new ClientCallbackContext::ListenContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(context),
+ cbdata.cb = listenMQCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+
+ std::string uri = assembleSetResourceUri(resourceUri, queryParams);
+
+ OCStackResult result = OC_STACK_ERROR;
+ auto cLock = m_csdkLock.lock();
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+ result = OCDoResource(
+ nullptr, OC_REST_GET,
+ uri.c_str(),
+ &devAddr, nullptr,
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete context;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult listenDeviceCallback(void* ctx,
OCDoHandle /*handle*/,
}
}
+#ifdef WITH_MQ
+ OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::CreateMQTopicContext* context =
+ static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
+ OCRepresentation rep;
+ HeaderOptions serverHeaderOptions;
+
+ if (!clientResponse)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string createdUri;
+ OCStackResult result = clientResponse->result;
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+ try
+ {
+ rep = parseGetSetCallback(clientResponse);
+ }
+ catch(OC::OCException& e)
+ {
+ result = e.code();
+ }
+
+ bool isLocationOption = false;
+ for (auto headerOption : serverHeaderOptions)
+ {
+ if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+ {
+ createdUri += "/";
+ createdUri += headerOption.getOptionData();
+ if (!isLocationOption)
+ {
+ isLocationOption = true;
+ }
+ }
+ }
+
+ if (!isLocationOption)
+ {
+ createdUri = clientResponse->resourceUri;
+ }
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+
+ if (!clientWrapper)
+ {
+ oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try{
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ createdUri);
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+ exec.detach();
+ }
+ }
+ else
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e){
+ oclog() << "Exception in createMQTopicCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQCreateTopicCallback& callback, QualityOfService QoS)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+ ClientCallbackContext::CreateMQTopicContext* ctx =
+ new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = createMQTopicCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+
+ std::string url = assembleSetResourceUri(uri, queryParams);
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+ result = OCDoResource(nullptr, OC_REST_PUT,
+ url.c_str(), &devAddr,
+ assembleSetResourcePayload(rep),
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult getResourceCallback(void* ctx,
OCDoHandle /*handle*/,
OCClientResponse* clientResponse)
std::thread exec(context->callback, serverHeaderOptions, attrs,
result, sequenceNumber);
exec.detach();
- if (sequenceNumber == OC_OBSERVE_DEREGISTER)
- {
- return OC_STACK_DELETE_TRANSACTION;
- }
+
return OC_STACK_KEEP_TRANSACTION;
}
return result;
}
+#ifdef WITH_CLOUD
OCStackResult InProcClientWrapper::SubscribeDevicePresence(OCDoHandle* handle,
const std::string& host,
- const QueryParamsList& queryParams,
+ const std::vector<std::string>& di,
OCConnectivityType connectivityType,
ObserveCallback& callback)
{
std::ostringstream os;
os << host << OCF_RSRVD_DEVICE_PRESENCE_URI;
+ QueryParamsList queryParams({{OC_RSRVD_DEVICE_ID, di}});
std::string url = assembleSetResourceUri(os.str(), queryParams);
result = OCDoResource(handle, OC_REST_OBSERVE,
return result;
}
+#endif
OCStackResult InProcClientWrapper::GetDefaultQos(QualityOfService& qos)
{