- all APIs related MQ is called from OCResource.
(MQ Broker and Topic is handled as Resource)
- MQ publisher can use publishMQTopic API
- MQ subscriber can use subscribeMQTopic / unsubscribeMQTopic
/ requestMQPublish API
- All MQ type can use discoveryMQTopics / createMQTopic API
- publish message is sent with POST base on OCF spec
- createTopic message is sent with PUT base on OCF spec
- discoveryMQTopics / createMQTopic API is implemented through new path
of wrapper class. because their callbacks is different with others
- Local MQ(D2D) is not support. it means there is no local MQ broker for D2D scenario
- refer : https://wiki.iotivity.org/message_queue_mq_for_publish-subscribe_interactions
Change-Id: Ibc1556a389f408634832149f646cd65bf8eda154
Signed-off-by: jihwan.seo <jihwan.seo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/8975
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jaehong Jo <jaehong.jo@samsung.com>
Reviewed-by: Ashok Babu Channa <ashok.channa@samsung.com>
OC_PRESENCE,
#endif
+#ifdef MQ_BROKER
+ /** "/oic/ps" .*/
+ OC_MQ_BROKER_URI,
+#endif
+
/** Max items in the list */
OC_MAX_VIRTUAL_RESOURCES //<s Max items in the list
+
} OCVirtualResources;
/**
* Maximum number of vendor specific header options an application can set or receive
* in PDU
*/
-#define MAX_HEADER_OPTIONS (2)
+#define MAX_HEADER_OPTIONS (100)
/**
* Maximum Length of the vendor specific header option
/** Gateway URI.*/
#define OC_RSRVD_GATEWAY_URI "/oic/gateway"
#endif
+
+#ifdef WITH_MQ
+/** MQ Broker URI.*/
+#define OC_RSRVD_WELL_KNOWN_MQ_URI "/.well-known/ocf/ps"
+#endif
+
#ifdef WITH_PRESENCE
/** Presence URI through which the OIC devices advertise their presence.*/
/** To represent resource type with RES.*/
#define OC_RSRVD_RESOURCE_TYPE_RES "oic.wk.res"
+#ifdef WITH_MQ
+/** To represent content type with MQ Broker.*/
+#define OC_RSRVD_RESOURCE_TYPE_MQ_BROKER "ocf.wk.ps"
+
+/** To represent content type with MQ Topic.*/
+#define OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC "ocf.wk.ps.topic"
+#endif
+
/** To represent interface.*/
#define OC_RSRVD_INTERFACE "if"
/** When this bit is set, the resource is allowed to be discovered only
* if discovery request contains an explicit querystring.
* Ex: GET /oic/res?rt=oic.sec.acl */
- OC_EXPLICIT_DISCOVERABLE = (1 << 5),
+ OC_EXPLICIT_DISCOVERABLE = (1 << 5)
#ifdef WITH_MQ
/** When this bit is set, the resource is allowed to be published */
- OC_MQ_PUBLISHER = (1 << 6),
+ ,OC_MQ_PUBLISHER = (1 << 6)
#endif
#ifdef MQ_BROKER
/** When this bit is set, the resource is allowed to be notified as MQ broker.*/
- OC_MQ_BROKER = (1 << 7),
+ ,OC_MQ_BROKER = (1 << 7)
#endif
} OCResourceProperty;
OC_OBSERVE_DEREGISTER = 1,
/** Others. */
- OC_OBSERVE_NO_OPTION = 2
+ OC_OBSERVE_NO_OPTION = 2,
+
+//#ifdef WITH_MQ
+ OC_MQ_SUBSCRIBER = 3,
+
+ OC_MQ_UNSUBSCRIBER = 4,
+//#endif
+
} OCObserveAction;
return OC_PRESENCE;
}
#endif //WITH_PRESENCE
+
+#ifdef MQ_BROKER
+ else if (0 == strcmp(uriInRequest, OC_RSRVD_WELL_KNOWN_MQ_URI))
+ {
+ return OC_MQ_BROKER_URI;
+ }
+#endif //MQ_BROKER
return OC_UNKNOWN_URI;
}
OCVirtualResources virtualUriInRequest = GetTypeOfVirtualURI (request->resourceUrl);
// Step 1: Generate the response to discovery request
- if (virtualUriInRequest == OC_WELL_KNOWN_URI)
+ if (virtualUriInRequest == OC_WELL_KNOWN_URI
+#ifdef MQ_BROKER
+ || virtualUriInRequest == OC_MQ_BROKER_URI
+#endif
+ )
{
if (request->method == OC_REST_PUT || request->method == OC_REST_POST || request->method == OC_REST_DELETE)
{
if (!resourceTypeQuery && interfaceQuery && (0 == strcmp(interfaceQuery, OC_RSRVD_INTERFACE_LL)))
{
+ OCResourceProperty prop = OC_DISCOVERABLE;
+#ifdef MQ_BROKER
+ if (OC_MQ_BROKER_URI == virtualUriInRequest)
+ {
+ prop = OC_MQ_BROKER;
+ }
+#endif
+
for (; resource && discoveryResult == OC_STACK_OK; resource = resource->next)
{
bool result = false;
- if (resource->resourceProperties & OC_DISCOVERABLE)
+
+ if (resource->resourceProperties & prop)
{
result = true;
}
static OCResource *tailResource = NULL;
static OCResourceHandle platformResource = {0};
static OCResourceHandle deviceResource = {0};
+#ifdef MQ_BROKER
+static OCResourceHandle brokerResource = {0};
+#endif
+
#ifdef WITH_PRESENCE
static OCPresenceState presenceState = OC_PRESENCE_UNINITIALIZED;
static PresenceResource presenceResource = {0};
OCStackResult CAResponseToOCStackResult(CAResponseResult_t caCode)
{
OCStackResult ret = OC_STACK_ERROR;
-
switch(caCode)
{
case CA_CREATED:
{
type = PAYLOAD_TYPE_DISCOVERY;
}
+#ifdef WITH_MQ
+ else if (strcmp(cbNode->requestUri, OC_RSRVD_WELL_KNOWN_MQ_URI) == 0)
+ {
+ type = PAYLOAD_TYPE_DISCOVERY;
+ }
+#endif
else if (strcmp(cbNode->requestUri, OC_RSRVD_DEVICE_URI) == 0)
{
type = PAYLOAD_TYPE_DEVICE;
(observationOption << 8) | optionData[i];
}
response.sequenceNumber = observationOption;
-
response.numRcvdVendorSpecificHeaderOptions = responseInfo->info.numOptions - 1;
start = 1;
}
return OC_STACK_INVALID_PARAM;
}
- if(!resourceInterfaceName || strlen(resourceInterfaceName) == 0)
+ if (!resourceInterfaceName || strlen(resourceInterfaceName) == 0)
{
resourceInterfaceName = OC_RSRVD_INTERFACE_DEFAULT;
}
}
memset(&platformResource, 0, sizeof(platformResource));
memset(&deviceResource, 0, sizeof(deviceResource));
+#ifdef MQ_BROKER
+ memset(&brokerResource, 0, sizeof(brokerResource));
+#endif
SRMDeInitSecureResources();
"/a/led",
0,
NULL,
- 128));// invalid bitmask for OCResourceProperty
+ 255));// invalid bitmask for OCResourceProperty
EXPECT_EQ(OC_STACK_OK, OCStop());
}
const OCPrm_t& pmSel, const std::string& pinNumber,
DirectPairingCallback& resultCallback) = 0;
+#ifdef WITH_MQ
+ virtual OCStackResult ListenForMQTopic(
+ const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ FindCallback& callback, QualityOfService QoS) = 0;
+
+ virtual OCStackResult PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQCreateTopicCallback& callback, QualityOfService QoS) = 0;
+#endif
virtual ~IClientWrapper(){}
};
}
DirectPairingContext(DirectPairingCallback cb) : callback(cb){}
};
+
+#ifdef WITH_MQ
+ struct CreateMQTopicContext
+ {
+ MQCreateTopicCallback callback;
+ std::weak_ptr<IClientWrapper> clientWrapper;
+ CreateMQTopicContext(MQCreateTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
+ : callback(cb), clientWrapper(cw){}
+ };
+#endif
}
class InProcClientWrapper : public IClientWrapper
virtual OCStackResult DoDirectPairing(std::shared_ptr<OCDirectPairing> peer, const OCPrm_t& pmSel,
const std::string& pinNumber, DirectPairingCallback& resultCallback);
+#ifdef WITH_MQ
+ virtual OCStackResult ListenForMQTopic(
+ const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ FindCallback& callback, QualityOfService QoS);
+
+ virtual OCStackResult PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQCreateTopicCallback& callback, QualityOfService QoS);
+#endif
+
private:
void listeningFunc();
std::string assembleSetResourceUri(std::string uri, const QueryParamsMap& queryParams);
typedef std::function<void(const PairedDevices&)> GetDirectPairedCallback;
+ typedef std::function<void(const HeaderOptions&,
+ const OCRepresentation&, const int,
+ std::shared_ptr<OCResource>)> MQCreateTopicCallback;
+
} // namespace OC
#endif
*/
std::string sid() const;
+#ifdef WITH_MQ
+ /**
+ * Function to discovery Topics from MQ Broker.
+ *
+ * @param queryParametersMap map which can have the query parameter name and value
+ * @param attributeHandler handles callback
+
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult discoveryMQTopics(const QueryParamsMap& queryParametersMap,
+ FindCallback attributeHandler);
+ /**
+ * Function to create Topic into MQ Broker.
+ * SubTopic is also created through this method.
+ *
+ * @param rep representation of the topic
+ * @param topicUri new uri of the topic which want to create
+ * @param queryParametersMap map which can have the query parameter name and value
+ * @param attributeHandler handles callback
+
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult createMQTopic(const OCRepresentation& rep,
+ const std::string& topicUri,
+ const QueryParamsMap& queryParametersMap,
+ MQCreateTopicCallback attributeHandler);
+#endif
+#ifdef MQ_SUBSCRIBER
+ /**
+ * Function to subscribe Topic to MQ Broker.
+ *
+ * @param observeType allows the client to specify how it wants to observe.
+ * @param queryParametersMap map which can have the query parameter name and value
+ * @param observeHandler handles callback
+
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult subscribeMQTopic(ObserveType observeType,
+ const QueryParamsMap& queryParametersMap,
+ ObserveCallback observeHandler);
+
+ /**
+ * Function to unsubscribe Topic to MQ Broker.
+ *
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult unsubscribeMQTopic();
+
+ /**
+ * Function to request publish to MQ publisher.
+ * Publisher can confirm the request message as key:"req_pub" and value:"true".
+ *
+ * @param queryParametersMap map which can have the query parameter name and value
+ * @param attributeHandler handles callback
+
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult requestMQPublish(const QueryParamsMap& queryParametersMap,
+ PostCallback attributeHandler);
+#endif
+#ifdef MQ_PUBLISHER
+ /**
+ * Function to publish Topic information into MQ Broker.
+ *
+ * @param rep representation of the topic
+ * @param queryParametersMap map which can have the query parameter name and value
+ * @param attributeHandler handles callback
+
+ * @return Returns ::OC_STACK_OK on success, some other value upon failure.
+ * @note OCStackResult is defined in ocstack.h.
+ *
+ */
+ OCStackResult publishMQTopic(const OCRepresentation& rep,
+ const QueryParamsMap& queryParametersMap,
+ PostCallback attributeHandler);
+#endif
// overloaded operators allow for putting into a 'set'
// the uniqueidentifier allows for putting into a hash
bool operator==(const OCResource &other) const;
if (res->port != 0)
{
- m_devAddr.port = res->port;
+ m_devAddr.port = res->port;
}
+
if (payload->baseURI)
{
OCDevAddr rdPubAddr = m_devAddr;
}
}
+#ifdef WITH_MQ
+ ListenOCContainer(std::weak_ptr<IClientWrapper> cw,
+ OCDevAddr& devAddr, OCRepPayload* payload)
+ : m_clientWrapper(cw), m_devAddr(devAddr)
+ {
+ if (payload)
+ {
+ char**topicList = nullptr;
+ size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
+ OCRepPayloadGetStringArray(payload, "topiclist", &topicList, dimensions);
+
+ for(size_t idx = 0; idx < dimensions[0]; idx++)
+ {
+ m_resources.push_back(std::shared_ptr<OC::OCResource>(
+ new OC::OCResource(m_clientWrapper, m_devAddr,
+ std::string(topicList[idx]),
+ "",
+ OC_OBSERVABLE,
+ {OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC},
+ {DEFAULT_INTERFACE})));
+ }
+ }
+ }
+
+ ListenOCContainer(std::weak_ptr<IClientWrapper> cw,
+ OCDevAddr& devAddr, const std::string& topicUri)
+ : m_clientWrapper(cw), m_devAddr(devAddr)
+ {
+ m_resources.push_back(std::shared_ptr<OC::OCResource>(
+ new OC::OCResource(m_clientWrapper, m_devAddr,
+ topicUri,
+ "",
+ OC_OBSERVABLE,
+ {OC_RSRVD_RESOURCE_TYPE_MQ_TOPIC},
+ {DEFAULT_INTERFACE})));
+ }
+#endif
+
const std::vector<std::shared_ptr<OCResource>>& Resources() const
{
return m_resources;
const OCPrm_t& /*pmSel*/,
const std::string& /*pinNumber*/, DirectPairingCallback& /*resultCallback*/)
{return OC_STACK_NOTIMPL;}
+
+#ifdef WITH_MQ
+ virtual OCStackResult ListenForMQTopic(const OCDevAddr& /*devAddr*/,
+ const std::string& /*resourceUri*/,
+ const QueryParamsMap& /*queryParams*/,
+ const HeaderOptions& /*headerOptions*/,
+ FindCallback& /*callback*/,
+ QualityOfService /*QoS*/)
+ {return OC_STACK_NOTIMPL;}
+
+ virtual OCStackResult PutMQTopicRepresentation(const OCDevAddr& /*devAddr*/,
+ const std::string& /*uri*/,
+ const OCRepresentation& /*rep*/,
+ const QueryParamsMap& /*queryParams*/,
+ const HeaderOptions& /*headerOptions*/,
+ MQCreateTopicCallback& /*callback*/,
+ QualityOfService /*QoS*/)
+ {return OC_STACK_NOTIMPL;}
+#endif
};
}
ListenOCContainer container(clientWrapper, clientResponse->devAddr,
reinterpret_cast<OCDiscoveryPayload*>(clientResponse->payload));
// loop to ensure valid construction of all resources
+
for(auto resource : container.Resources())
{
std::thread exec(context->callback, resource);
}
return result;
}
+#ifdef WITH_MQ
+ OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::ListenContext* context =
+ static_cast<ClientCallbackContext::ListenContext*>(ctx);
+
+ if (!clientResponse)
+ {
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ if (clientResponse->result != OC_STACK_OK)
+ {
+ oclog() << "listenMQCallback(): failed to create resource. clientResponse: "
+ << clientResponse->result
+ << std::flush;
+
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+ if (!clientWrapper)
+ {
+ oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ try{
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ (OCRepPayload *) clientResponse->payload);
+
+ // loop to ensure valid construction of all resources
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, resource);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e){
+ oclog() << "Exception in listCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+
+
+ return OC_STACK_KEEP_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::ListenForMQTopic(
+ const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ FindCallback& callback, QualityOfService QoS)
+ {
+ oclog() << "ListenForMQTopic()" << std::flush;
+
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+
+ ClientCallbackContext::ListenContext* context =
+ new ClientCallbackContext::ListenContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(context),
+ cbdata.cb = listenMQCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+
+ std::string uri = assembleSetResourceUri(resourceUri, queryParams);
+
+ OCStackResult result = OC_STACK_ERROR;
+ auto cLock = m_csdkLock.lock();
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+ result = OCDoResource(
+ nullptr, OC_REST_GET,
+ uri.c_str(),
+ &devAddr, nullptr,
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete context;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult listenDeviceCallback(void* ctx,
OCDoHandle /*handle*/,
}
}
+#ifdef WITH_MQ
+ OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
+ OCClientResponse* clientResponse)
+ {
+ ClientCallbackContext::CreateMQTopicContext* context =
+ static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
+ OCRepresentation rep;
+ HeaderOptions serverHeaderOptions;
+
+ if (!clientResponse)
+ {
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ std::string createdUri;
+ OCStackResult result = clientResponse->result;
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ parseServerHeaderOptions(clientResponse, serverHeaderOptions);
+ try
+ {
+ rep = parseGetSetCallback(clientResponse);
+ }
+ catch(OC::OCException& e)
+ {
+ result = e.code();
+ }
+
+ bool isLocationOption = false;
+ for (auto headerOption : serverHeaderOptions)
+ {
+ if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
+ {
+ createdUri += "/";
+ createdUri += headerOption.getOptionData();
+ if (!isLocationOption)
+ {
+ isLocationOption = true;
+ }
+ }
+ }
+
+ if (!isLocationOption)
+ {
+ createdUri = clientResponse->resourceUri;
+ }
+ }
+
+ auto clientWrapper = context->clientWrapper.lock();
+
+ if (!clientWrapper)
+ {
+ oclog() << "createMQTopicCallback(): failed to get a shared_ptr to the client wrapper"
+ << std::flush;
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ try{
+ if (OC_STACK_OK == result ||
+ OC_STACK_RESOURCE_CREATED == result)
+ {
+ ListenOCContainer container(clientWrapper, clientResponse->devAddr,
+ createdUri);
+ for (auto resource : container.Resources())
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+ exec.detach();
+ }
+ }
+ else
+ {
+ std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+ exec.detach();
+ }
+ }
+ catch (std::exception &e){
+ oclog() << "Exception in createMQTopicCallback, ignoring response: "
+ << e.what() << std::flush;
+ }
+ return OC_STACK_DELETE_TRANSACTION;
+ }
+
+ OCStackResult InProcClientWrapper::PutMQTopicRepresentation(
+ const OCDevAddr& devAddr,
+ const std::string& uri,
+ const OCRepresentation& rep,
+ const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
+ MQCreateTopicCallback& callback, QualityOfService QoS)
+ {
+ if (!callback)
+ {
+ return OC_STACK_INVALID_PARAM;
+ }
+ OCStackResult result;
+ ClientCallbackContext::CreateMQTopicContext* ctx =
+ new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+ OCCallbackData cbdata;
+ cbdata.context = static_cast<void*>(ctx),
+ cbdata.cb = createMQTopicCallback;
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+
+ std::string url = assembleSetResourceUri(uri, queryParams);
+
+ auto cLock = m_csdkLock.lock();
+
+ if (cLock)
+ {
+ std::lock_guard<std::recursive_mutex> lock(*cLock);
+ OCHeaderOption options[MAX_HEADER_OPTIONS];
+
+ result = OCDoResource(nullptr, OC_REST_PUT,
+ url.c_str(), &devAddr,
+ assembleSetResourcePayload(rep),
+ CT_DEFAULT,
+ static_cast<OCQualityOfService>(QoS),
+ &cbdata,
+ assembleHeaderOptions(options, headerOptions),
+ headerOptions.size());
+ }
+ else
+ {
+ delete ctx;
+ result = OC_STACK_ERROR;
+ }
+
+ return result;
+ }
+#endif
OCStackApplicationResult getResourceCallback(void* ctx,
OCDoHandle /*handle*/,
OCClientResponse* clientResponse)
return this->uniqueIdentifier().m_representation;
}
+#ifdef WITH_MQ
+OCStackResult OCResource::discoveryMQTopics(const QueryParamsMap& queryParametersMap,
+ FindCallback attributeHandler)
+{
+ QualityOfService defaultQos = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+ return checked_guard(m_clientWrapper.lock(),
+ &IClientWrapper::ListenForMQTopic,
+ m_devAddr, m_uri,
+ queryParametersMap, m_headerOptions,
+ attributeHandler, defaultQos);
+}
+
+OCStackResult OCResource::createMQTopic(const OCRepresentation& rep,
+ const std::string& topicUri,
+ const QueryParamsMap& queryParametersMap,
+ MQCreateTopicCallback attributeHandler)
+{
+ QualityOfService defaultQos = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+ return checked_guard(m_clientWrapper.lock(), &IClientWrapper::PutMQTopicRepresentation,
+ m_devAddr, topicUri, rep, queryParametersMap,
+ m_headerOptions, attributeHandler, defaultQos);
+}
+#endif
+#ifdef MQ_SUBSCRIBER
+OCStackResult OCResource::subscribeMQTopic(ObserveType observeType,
+ const QueryParamsMap& queryParametersMap, ObserveCallback observeHandler)
+{
+ QualityOfService defaultQoS = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
+
+ return result_guard(observe(observeType, queryParametersMap, observeHandler, defaultQoS));
+}
+
+OCStackResult OCResource::unsubscribeMQTopic()
+{
+ QualityOfService defaultQoS = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
+ return result_guard(cancelObserve(defaultQoS));
+}
+
+OCStackResult OCResource::requestMQPublish(const QueryParamsMap& queryParametersMap,
+ PostCallback attributeHandler)
+{
+ OCRepresentation rep;
+ rep.setValue(std::string("req_pub"), std::string("true"));
+ QualityOfService defaultQos = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+ return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+}
+#endif
+#ifdef MQ_PUBLISHER
+OCStackResult OCResource::publishMQTopic(const OCRepresentation& rep,
+ const QueryParamsMap& queryParametersMap,
+ PostCallback attributeHandler)
+{
+ QualityOfService defaultQos = OC::QualityOfService::NaQos;
+ checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
+ return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+}
+#endif
+
bool OCResource::operator==(const OCResource &other) const
{
return m_resourceId == other.m_resourceId;