- add quality of service param in Cloud MQ client.
- combine callbacks for some APIs like 'createMQTopic','discoveryTopics'
because both APIs need same callback format which give OCResource Obj.
Change-Id: Ie6eb734e10387216f6b1b6d1e5c122668ddcee6b
Signed-off-by: jihwan.seo <jihwan.seo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/9991
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jaehong Jo <jaehong.jo@samsung.com>
Reviewed-by: Ashok Babu Channa <ashok.channa@samsung.com>
onTopicFoundListener->foundTopicCallback(eCode, uri, resource);
};
- return m_sharedResource->discoveryMQTopics(queryParametersMap, findCallback);
+ return m_sharedResource->discoveryMQTopics(queryParametersMap, findCallback, QoS);
}
OCStackResult JniOcResource::createMQTopic(JNIEnv* env,
return m_sharedResource->createMQTopic(representation, targetUri,
queryParametersMap,
- createCallback);
+ createCallback, QoS);
}
#endif
#ifdef MQ_SUBSCRIBER
};
return m_sharedResource->subscribeMQTopic(ObserveType::Observe, queryParametersMap,
- subscribeCallback);
+ subscribeCallback, QoS);
}
OCStackResult JniOcResource::unsubscribeMQTopic(QualityOfService QoS)
{
- return m_sharedResource->unsubscribeMQTopic();
+ return m_sharedResource->unsubscribeMQTopic(QoS);
}
OCStackResult JniOcResource::requestMQPublish(JNIEnv* env,
onPostListener->onPostCallback(opts, rep, eCode);
};
- return m_sharedResource->requestMQPublish(queryParametersMap, postCallback);
+ return m_sharedResource->requestMQPublish(queryParametersMap, postCallback, QoS);
}
#endif
#ifdef MQ_PUBLISHER
};
return m_sharedResource->publishMQTopic(representation, queryParametersMap,
- postCallback);
+ postCallback, QoS);
}
#endif
const OCDevAddr& devAddr,
const std::string& resourceUri,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- FindCallback& callback, QualityOfService QoS) = 0;
+ MQTopicCallback& callback, QualityOfService QoS) = 0;
virtual OCStackResult PutMQTopicRepresentation(
const OCDevAddr& devAddr,
const std::string& uri,
const OCRepresentation& rep,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- MQCreateTopicCallback& callback, QualityOfService QoS) = 0;
+ MQTopicCallback& callback, QualityOfService QoS) = 0;
#endif
virtual ~IClientWrapper(){}
};
};
#ifdef WITH_MQ
- struct CreateMQTopicContext
+ struct MQTopicContext
{
- MQCreateTopicCallback callback;
+ MQTopicCallback callback;
std::weak_ptr<IClientWrapper> clientWrapper;
- CreateMQTopicContext(MQCreateTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
+ MQTopicContext(MQTopicCallback cb, std::weak_ptr<IClientWrapper> cw)
: callback(cb), clientWrapper(cw){}
};
#endif
const OCDevAddr& devAddr,
const std::string& resourceUri,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- FindCallback& callback, QualityOfService QoS);
+ MQTopicCallback& callback, QualityOfService QoS);
virtual OCStackResult PutMQTopicRepresentation(
const OCDevAddr& devAddr,
const std::string& uri,
const OCRepresentation& rep,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- MQCreateTopicCallback& callback, QualityOfService QoS);
+ MQTopicCallback& callback, QualityOfService QoS);
#endif
private:
typedef std::function<void(const PairedDevices&)> GetDirectPairedCallback;
- typedef std::function<void(const HeaderOptions&,
- const OCRepresentation&, const int,
- std::shared_ptr<OCResource>)> MQCreateTopicCallback;
+ typedef std::function<void(const int, const std::string&,
+ std::shared_ptr<OCResource>)> MQTopicCallback;
#ifdef RD_CLIENT
typedef std::function<void(const OCRepresentation&, const int)> PublishResourceCallback;
*
* @param queryParametersMap map which can have the query parameter name and value
* @param attributeHandler handles callback
-
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
*/
OCStackResult discoveryMQTopics(const QueryParamsMap& queryParametersMap,
- FindCallback attributeHandler);
+ MQTopicCallback attributeHandler,
+ QualityOfService qos);
/**
* Function to create Topic into MQ Broker.
* SubTopic is also created through this method.
* @param topicUri new uri of the topic which want to create
* @param queryParametersMap map which can have the query parameter name and value
* @param attributeHandler handles callback
-
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
OCStackResult createMQTopic(const OCRepresentation& rep,
const std::string& topicUri,
const QueryParamsMap& queryParametersMap,
- MQCreateTopicCallback attributeHandler);
+ MQTopicCallback attributeHandler,
+ QualityOfService qos);
#endif
#ifdef MQ_SUBSCRIBER
/**
* @param observeType allows the client to specify how it wants to observe.
* @param queryParametersMap map which can have the query parameter name and value
* @param observeHandler handles callback
-
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
*/
OCStackResult subscribeMQTopic(ObserveType observeType,
const QueryParamsMap& queryParametersMap,
- ObserveCallback observeHandler);
+ ObserveCallback observeHandler,
+ QualityOfService qos);
/**
* Function to unsubscribe Topic to MQ Broker.
*
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
*/
- OCStackResult unsubscribeMQTopic();
+ OCStackResult unsubscribeMQTopic(QualityOfService qos);
/**
* Function to request publish to MQ publisher.
*
* @param queryParametersMap map which can have the query parameter name and value
* @param attributeHandler handles callback
-
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
*/
OCStackResult requestMQPublish(const QueryParamsMap& queryParametersMap,
- PostCallback attributeHandler);
+ PostCallback attributeHandler,
+ QualityOfService qos);
#endif
#ifdef MQ_PUBLISHER
/**
* @param rep representation of the topic
* @param queryParametersMap map which can have the query parameter name and value
* @param attributeHandler handles callback
-
+ * @param qos the quality of communication
+ *
* @return Returns ::OC_STACK_OK on success, some other value upon failure.
* @note OCStackResult is defined in ocstack.h.
*
*/
OCStackResult publishMQTopic(const OCRepresentation& rep,
const QueryParamsMap& queryParametersMap,
- PostCallback attributeHandler);
+ PostCallback attributeHandler,
+ QualityOfService qos);
#endif
// overloaded operators allow for putting into a 'set'
// the uniqueidentifier allows for putting into a hash
const std::string& /*resourceUri*/,
const QueryParamsMap& /*queryParams*/,
const HeaderOptions& /*headerOptions*/,
- FindCallback& /*callback*/,
+ MQTopicCallback& /*callback*/,
QualityOfService /*QoS*/)
{return OC_STACK_NOTIMPL;}
const OCRepresentation& /*rep*/,
const QueryParamsMap& /*queryParams*/,
const HeaderOptions& /*headerOptions*/,
- MQCreateTopicCallback& /*callback*/,
+ MQTopicCallback& /*callback*/,
QualityOfService /*QoS*/)
{return OC_STACK_NOTIMPL;}
#endif
}
#ifdef WITH_MQ
OCStackApplicationResult listenMQCallback(void* ctx, OCDoHandle /*handle*/,
- OCClientResponse* clientResponse)
+ OCClientResponse* clientResponse)
{
- ClientCallbackContext::ListenContext* context =
- static_cast<ClientCallbackContext::ListenContext*>(ctx);
+ ClientCallbackContext::MQTopicContext* context =
+ static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
- if (!clientResponse)
+ if (!clientResponse || !context)
{
- return OC_STACK_KEEP_TRANSACTION;
+ return OC_STACK_DELETE_TRANSACTION;
}
if (clientResponse->result != OC_STACK_OK)
<< clientResponse->result
<< std::flush;
- return OC_STACK_KEEP_TRANSACTION;
+ std::thread exec(context->callback, clientResponse->result,
+ clientResponse->resourceUri, nullptr);
+ exec.detach();
+
+ return OC_STACK_DELETE_TRANSACTION;
}
auto clientWrapper = context->clientWrapper.lock();
{
oclog() << "listenMQCallback(): failed to get a shared_ptr to the client wrapper"
<< std::flush;
- return OC_STACK_KEEP_TRANSACTION;
+ return OC_STACK_DELETE_TRANSACTION;
}
try{
// loop to ensure valid construction of all resources
for (auto resource : container.Resources())
{
- std::thread exec(context->callback, resource);
+ std::thread exec(context->callback, clientResponse->result,
+ clientResponse->resourceUri, resource);
exec.detach();
}
}
<< e.what() << std::flush;
}
-
- return OC_STACK_KEEP_TRANSACTION;
+ return OC_STACK_DELETE_TRANSACTION;
}
- OCStackResult InProcClientWrapper::ListenForMQTopic(
- const OCDevAddr& devAddr,
- const std::string& resourceUri,
- const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- FindCallback& callback, QualityOfService QoS)
+ OCStackResult InProcClientWrapper::ListenForMQTopic(const OCDevAddr& devAddr,
+ const std::string& resourceUri,
+ const QueryParamsMap& queryParams,
+ const HeaderOptions& headerOptions,
+ MQTopicCallback& callback,
+ QualityOfService QoS)
{
oclog() << "ListenForMQTopic()" << std::flush;
return OC_STACK_INVALID_PARAM;
}
- ClientCallbackContext::ListenContext* context =
- new ClientCallbackContext::ListenContext(callback, shared_from_this());
+ ClientCallbackContext::MQTopicContext* context =
+ new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
OCCallbackData cbdata;
cbdata.context = static_cast<void*>(context),
cbdata.cb = listenMQCallback;
- cbdata.cd = [](void* c){delete (ClientCallbackContext::ListenContext*)c;};
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
std::string uri = assembleSetResourceUri(resourceUri, queryParams);
OCStackApplicationResult createMQTopicCallback(void* ctx, OCDoHandle /*handle*/,
OCClientResponse* clientResponse)
{
- ClientCallbackContext::CreateMQTopicContext* context =
- static_cast<ClientCallbackContext::CreateMQTopicContext*>(ctx);
- OCRepresentation rep;
+ ClientCallbackContext::MQTopicContext* context =
+ static_cast<ClientCallbackContext::MQTopicContext*>(ctx);
HeaderOptions serverHeaderOptions;
- if (!clientResponse)
+ if (!clientResponse || !context)
{
return OC_STACK_DELETE_TRANSACTION;
}
std::string createdUri;
+ bool isLocationOption = false;
OCStackResult result = clientResponse->result;
if (OC_STACK_OK == result ||
OC_STACK_RESOURCE_CREATED == result)
{
parseServerHeaderOptions(clientResponse, serverHeaderOptions);
- try
- {
- rep = parseGetSetCallback(clientResponse);
- }
- catch(OC::OCException& e)
- {
- result = e.code();
- }
- bool isLocationOption = false;
for (auto headerOption : serverHeaderOptions)
{
if (HeaderOption::LOCATION_PATH_OPTION_ID == headerOption.getOptionID())
}
}
}
+ }
- if (!isLocationOption)
- {
- createdUri = clientResponse->resourceUri;
- }
+ if (!isLocationOption)
+ {
+ createdUri = clientResponse->resourceUri;
}
auto clientWrapper = context->clientWrapper.lock();
createdUri);
for (auto resource : container.Resources())
{
- std::thread exec(context->callback, serverHeaderOptions, rep, result, resource);
+ std::thread exec(context->callback, result, createdUri, resource);
exec.detach();
}
}
else
{
- std::thread exec(context->callback, serverHeaderOptions, rep, result, nullptr);
+ std::thread exec(context->callback, result, createdUri, nullptr);
exec.detach();
}
}
const std::string& uri,
const OCRepresentation& rep,
const QueryParamsMap& queryParams, const HeaderOptions& headerOptions,
- MQCreateTopicCallback& callback, QualityOfService QoS)
+ MQTopicCallback& callback, QualityOfService QoS)
{
if (!callback)
{
return OC_STACK_INVALID_PARAM;
}
OCStackResult result;
- ClientCallbackContext::CreateMQTopicContext* ctx =
- new ClientCallbackContext::CreateMQTopicContext(callback, shared_from_this());
+ ClientCallbackContext::MQTopicContext* ctx =
+ new ClientCallbackContext::MQTopicContext(callback, shared_from_this());
OCCallbackData cbdata;
cbdata.context = static_cast<void*>(ctx),
cbdata.cb = createMQTopicCallback;
- cbdata.cd = [](void* c){delete (ClientCallbackContext::CreateMQTopicContext*)c;};
+ cbdata.cd = [](void* c){delete (ClientCallbackContext::MQTopicContext*)c;};
std::string url = assembleSetResourceUri(uri, queryParams);
#ifdef WITH_MQ
OCStackResult OCResource::discoveryMQTopics(const QueryParamsMap& queryParametersMap,
- FindCallback attributeHandler)
+ MQTopicCallback attributeHandler,
+ QualityOfService qos)
{
- QualityOfService defaultQos = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
return checked_guard(m_clientWrapper.lock(),
&IClientWrapper::ListenForMQTopic,
m_devAddr, m_uri,
queryParametersMap, m_headerOptions,
- attributeHandler, defaultQos);
+ attributeHandler, qos);
}
OCStackResult OCResource::createMQTopic(const OCRepresentation& rep,
const std::string& topicUri,
const QueryParamsMap& queryParametersMap,
- MQCreateTopicCallback attributeHandler)
+ MQTopicCallback attributeHandler,
+ QualityOfService qos)
{
- QualityOfService defaultQos = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
return checked_guard(m_clientWrapper.lock(), &IClientWrapper::PutMQTopicRepresentation,
m_devAddr, topicUri, rep, queryParametersMap,
- m_headerOptions, attributeHandler, defaultQos);
+ m_headerOptions, attributeHandler, qos);
}
#endif
#ifdef MQ_SUBSCRIBER
OCStackResult OCResource::subscribeMQTopic(ObserveType observeType,
- const QueryParamsMap& queryParametersMap, ObserveCallback observeHandler)
+ const QueryParamsMap& queryParametersMap,
+ ObserveCallback observeHandler,
+ QualityOfService qos)
{
- QualityOfService defaultQoS = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
-
- return result_guard(observe(observeType, queryParametersMap, observeHandler, defaultQoS));
+ return result_guard(observe(observeType, queryParametersMap, observeHandler, qos));
}
-OCStackResult OCResource::unsubscribeMQTopic()
+OCStackResult OCResource::unsubscribeMQTopic(QualityOfService qos)
{
- QualityOfService defaultQoS = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQoS);
- return result_guard(cancelObserve(defaultQoS));
+ return result_guard(cancelObserve(qos));
}
OCStackResult OCResource::requestMQPublish(const QueryParamsMap& queryParametersMap,
- PostCallback attributeHandler)
+ PostCallback attributeHandler,
+ QualityOfService qos)
{
OCRepresentation rep;
rep.setValue(std::string("req_pub"), std::string("true"));
- QualityOfService defaultQos = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
- return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+ return result_guard(post(rep, queryParametersMap, attributeHandler, qos));
}
#endif
#ifdef MQ_PUBLISHER
OCStackResult OCResource::publishMQTopic(const OCRepresentation& rep,
const QueryParamsMap& queryParametersMap,
- PostCallback attributeHandler)
+ PostCallback attributeHandler,
+ QualityOfService qos)
{
- QualityOfService defaultQos = OC::QualityOfService::NaQos;
- checked_guard(m_clientWrapper.lock(), &IClientWrapper::GetDefaultQos, defaultQos);
- return result_guard(post(rep, queryParametersMap, attributeHandler, defaultQos));
+ return result_guard(post(rep, queryParametersMap, attributeHandler, qos));
}
#endif