+#ifdef WITH_MQ
+ OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::MQTopicContext* context =
+ static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
+ HeaderOptions serverHeaderOptions;
+
+ if (!clientResponse || !context)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string createdUri;
+ bool isLocationOption = false;
+ OCStackResult result = clientResponse->result;
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+
+ for (auto headerOption : serverHeaderOptions)
+ {
+ if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+ {
+ createdUri += "/";
+ createdUri += headerOption.getOptionData();
+ if (!isLocationOption)
+ {
+ isLocationOption = true;
+ }
+ }
+ }
+ }
+
+ if (!isLocationOption)
+ {
+ createdUri = std::string(clientResponse->resourceUri);
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+
+ if (!clientWrapper)
+ {
+ oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try
+ {
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ createdUri);
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, result,
+ createdUri,
+ resource);
+ exec.detach();
+ }
+ }
+ else
+ {
+ std::thread exec(context->callback, result,
+ createdUri,
+ nullptr);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e)
+ {
+ oclog() << "Exception in createMQTopicCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQTopicCallback& callback, QualityOfService QoS)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+ ClientCallbackContext::MQTopicContext* ctx =
+ new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = createMQTopicCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
+
+ std::string url = assembleSetResourceUri(uri, queryParams);
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+ result = OCDoResource(nullptr, OC_REST_PUT,
+ url.c_str(), &devAddr,
+ assembleSetResourcePayload(rep),
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif