+#ifdef WITH_MQ
+ OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::CreateMQTopicContext* context =
+ static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
+ OCRepresentation rep;
+ HeaderOptions serverHeaderOptions;
+
+ if (!clientResponse)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string createdUri;
+ OCStackResult result = clientResponse->result;
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+ try
+ {
+ rep = parseGetSetCallback(clientResponse);
+ }
+ catch(OC::OCException& e)
+ {
+ result = e.code();
+ }
+
+ bool isLocationOption = false;
+ for (auto headerOption : serverHeaderOptions)
+ {
+ if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+ {
+ createdUri += "/";
+ createdUri += headerOption.getOptionData();
+ if (!isLocationOption)
+ {
+ isLocationOption = true;
+ }
+ }
+ }
+
+ if (!isLocationOption)
+ {
+ createdUri = clientResponse->resourceUri;
+ }
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+
+ if (!clientWrapper)
+ {
+ oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try{
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ createdUri);
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+ exec.detach();
+ }
+ }
+ else
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e){
+ oclog() << "Exception in createMQTopicCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQCreateTopicCallback& callback, QualityOfService QoS)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+ ClientCallbackContext::CreateMQTopicContext* ctx =
+ new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = createMQTopicCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+
+ std::string url = assembleSetResourceUri(uri, queryParams);
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+ result = OCDoResource(nullptr, OC_REST_PUT,
+ url.c_str(), &devAddr,
+ assembleSetResourcePayload(rep),
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif