From 5e7e411358dca7f4ac85a71c633f8ee1723bdbcc Mon Sep 17 00:00:00 2001 From: "jaesick.shin" Date: Tue, 1 Nov 2016 14:05:13 +0900 Subject: [PATCH] [IoT-1523] Required MQ Functionality for Notification Service. this patch include, 1. Add logic for supported MQ in provider side. 2. split common function. second patch include, add #ifdef ~ #endif. Change-Id: I8c8b419eca7907f5503ebb2d2dce9b3caec592d8 Signed-off-by: jaesick.shin Reviewed-on: https://gerrit.iotivity.org/gerrit/13927 Tested-by: jenkins-iotivity Reviewed-by: Uze Choi Tested-by: Uze Choi --- service/notification/include/NSCommon.h | 2 +- service/notification/include/NSProviderInterface.h | 11 +- service/notification/src/common/NSConstants.h | 7 +- service/notification/src/common/NSStructs.h | 16 ++ service/notification/src/common/NSUtil.c | 83 +++++++++++ service/notification/src/common/NSUtil.h | 4 + .../notification/src/consumer/NSConsumerCommon.c | 67 --------- .../notification/src/consumer/NSConsumerCommon.h | 10 -- .../src/consumer/NSConsumerInterface.c | 19 +-- .../notification/src/consumer/NSConsumerMQPlugin.c | 4 +- .../src/provider/NSProviderInterface.c | 42 +++++- .../notification/src/provider/NSProviderListener.c | 164 +++++++++++++++++++++ .../notification/src/provider/NSProviderListener.h | 6 + .../src/provider/NSProviderNotification.c | 34 ++++- .../src/provider/NSProviderSubscription.c | 29 ++++ .../notification/src/provider/NSProviderSystem.c | 22 +++ .../notification/src/provider/NSProviderSystem.h | 5 + 17 files changed, 418 insertions(+), 107 deletions(-) diff --git a/service/notification/include/NSCommon.h b/service/notification/include/NSCommon.h index ff4f231..f44dc45 100644 --- a/service/notification/include/NSCommon.h +++ b/service/notification/include/NSCommon.h @@ -31,7 +31,7 @@ #include #define NS_UUID_STRING_SIZE 37 - +//#define WITH_MQ /** * Result code of notification service */ diff --git a/service/notification/include/NSProviderInterface.h b/service/notification/include/NSProviderInterface.h index 0616233..260c554 100644 --- a/service/notification/include/NSProviderInterface.h +++ b/service/notification/include/NSProviderInterface.h @@ -35,7 +35,6 @@ extern "C" #include "NSCommon.h" #include #include - /** * Invoked when provider receives the subscription request of consumer. * @param[in] consumer Consumer who subscribes the notification message resource @@ -99,6 +98,16 @@ NSResult NSProviderEnableRemoteService(char * serverAddress); */ NSResult NSProviderDisableRemoteService(char * serverAddress); +#ifdef WITH_MQ +/** + * Request to subscribe to remote MQ address as parameter. + * @param[in] server address combined with IP address and port number and MQ broker uri using delimiter : + * @param[in] topicName the interest Topic name for subscription. + * @return ::NS_OK or result code of NSResult + */ +NSResult NSProviderSubscribeMQService(const char * serverAddress, const char * topicName); +#endif + /** * Send notification message to all subscribers * @param[in] message Notification message including id, title, contentText diff --git a/service/notification/src/common/NSConstants.h b/service/notification/src/common/NSConstants.h index b86b93d..84eba38 100644 --- a/service/notification/src/common/NSConstants.h +++ b/service/notification/src/common/NSConstants.h @@ -21,7 +21,7 @@ #ifndef _NS_CONSTANTS_H_ #define _NS_CONSTANTS_H_ -#define __PRINTLOG 0 +#define __PRINTLOG 1 #define __NS_FILE__ ( strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__ ) #ifdef TB_LOG @@ -102,6 +102,11 @@ #define NS_RD_PUBLISH_QUERY "/oic/rd?rt=oic.wk.rdpub" +#ifdef WITH_MQ +#define NS_ATTRIBUTE_MQ_MESSAGE "message" +#define NS_ATTIRBUTE_MQ_TOPICLIST "topiclist" +#endif + #define NS_VERIFY_NOT_NULL_V(obj) \ { \ if ((obj) == NULL) \ diff --git a/service/notification/src/common/NSStructs.h b/service/notification/src/common/NSStructs.h index ceb84a3..e69cdcb 100644 --- a/service/notification/src/common/NSStructs.h +++ b/service/notification/src/common/NSStructs.h @@ -156,4 +156,20 @@ typedef struct } NSProviderInfo; +#ifdef WITH_MQ +typedef struct +{ + char * serverAddr; + char * topicName; + +} NSMQTopicAddress; + +typedef struct +{ + char * serverUri; + OCDevAddr * devAddr; + +} NSMQServerInfo; +#endif + #endif /* _NS_STRUCTS_H_ */ diff --git a/service/notification/src/common/NSUtil.c b/service/notification/src/common/NSUtil.c index 2789366..e3e7c7e 100755 --- a/service/notification/src/common/NSUtil.c +++ b/service/notification/src/common/NSUtil.c @@ -481,3 +481,86 @@ NSTopicList * NSInitializeTopicList() return topicList; } +char * NSGetQueryAddress(const char * serverFullAddress) +{ + if (strstr(serverFullAddress, "coap+tcp://")) + { + return OICStrdup(serverFullAddress+11); + } + else if (strstr(serverFullAddress, "coap://")) + { + return OICStrdup(serverFullAddress+7); + } + else + { + return OICStrdup(serverFullAddress); + } +} + +OCDevAddr * NSChangeAddress(const char * address) +{ + NS_VERIFY_NOT_NULL(address, NULL); + OCDevAddr * retAddr = NULL; + + int index = 0; + while(address[index] != '\0') + { + if (address[index] == ':') + { + break; + } + index++; + } + + if (address[index] == '\0') + { + return NULL; + } + + int tmp = index + 1; + uint16_t port = address[tmp++] - '0'; + + while(true) + { + if (address[tmp] == '\0' || address[tmp] > '9' || address[tmp] < '0') + { + break; + } + port *= 10; + port += address[tmp++] - '0'; + } + + retAddr = (OCDevAddr *) OICMalloc(sizeof(OCDevAddr)); + NS_VERIFY_NOT_NULL(retAddr, NULL); + + retAddr->adapter = OC_ADAPTER_TCP; + OICStrcpy(retAddr->addr, index + 1, address); + retAddr->addr[index] = '\0'; + retAddr->port = port; + retAddr->flags = OC_IP_USE_V6; + + NS_LOG(DEBUG, "Change Address for TCP request"); + NS_LOG_V(DEBUG, "Origin : %s", address); + NS_LOG_V(DEBUG, "Changed Addr : %s", retAddr->addr); + NS_LOG_V(DEBUG, "Changed Port : %d", retAddr->port); + + return retAddr; +} + +bool NSOCResultToSuccess(OCStackResult ret) +{ + switch (ret) + { + case OC_STACK_OK: + case OC_STACK_RESOURCE_CREATED: + case OC_STACK_RESOURCE_DELETED: + case OC_STACK_PRESENCE_STOPPED: + case OC_STACK_CONTINUE: + case OC_STACK_RESOURCE_CHANGED: + return true; + default: + NS_LOG_V(DEBUG, "OCStackResult : %d", (int)ret); + return false; + } +} + diff --git a/service/notification/src/common/NSUtil.h b/service/notification/src/common/NSUtil.h index b6bfd79..bdcc03d 100755 --- a/service/notification/src/common/NSUtil.h +++ b/service/notification/src/common/NSUtil.h @@ -63,4 +63,8 @@ NSMediaContents * NSDuplicateMediaContents(NSMediaContents * copyObj); OCRepPayloadValue* NSPayloadFindValue(const OCRepPayload* payload, const char* name); +char * NSGetQueryAddress(const char * serverFullAddress); +OCDevAddr * NSChangeAddress(const char * address); +bool NSOCResultToSuccess(OCStackResult ret); + #endif /* _NS_UTIL__H_ */ diff --git a/service/notification/src/consumer/NSConsumerCommon.c b/service/notification/src/consumer/NSConsumerCommon.c index 104f2c6..95ca8ab 100644 --- a/service/notification/src/consumer/NSConsumerCommon.c +++ b/service/notification/src/consumer/NSConsumerCommon.c @@ -811,73 +811,6 @@ OCStackResult NSInvokeRequest(OCDoHandle * handle, return ret; } -bool NSOCResultToSuccess(OCStackResult ret) -{ - switch (ret) - { - case OC_STACK_OK: - case OC_STACK_RESOURCE_CREATED: - case OC_STACK_RESOURCE_DELETED: - case OC_STACK_PRESENCE_STOPPED: - case OC_STACK_CONTINUE: - case OC_STACK_RESOURCE_CHANGED: - return true; - default: - NS_LOG_V(DEBUG, "OCStackResult : %d", (int)ret); - return false; - } -} - -OCDevAddr * NSChangeAddress(const char * address) -{ - NS_VERIFY_NOT_NULL(address, NULL); - OCDevAddr * retAddr = NULL; - - int index = 0; - while(address[index] != '\0') - { - if (address[index] == ':') - { - break; - } - index++; - } - - if (address[index] == '\0') - { - return NULL; - } - - int tmp = index + 1; - uint16_t port = address[tmp++] - '0'; - - while(true) - { - if (address[tmp] == '\0' || address[tmp] > '9' || address[tmp] < '0') - { - break; - } - port *= 10; - port += address[tmp++] - '0'; - } - - retAddr = (OCDevAddr *) OICMalloc(sizeof(OCDevAddr)); - NS_VERIFY_NOT_NULL(retAddr, NULL); - - retAddr->adapter = OC_ADAPTER_TCP; - OICStrcpy(retAddr->addr, index + 1, address); - retAddr->addr[index] = '\0'; - retAddr->port = port; - retAddr->flags = OC_IP_USE_V6; - - NS_LOG(DEBUG, "Change Address for TCP request"); - NS_LOG_V(DEBUG, "Origin : %s", address); - NS_LOG_V(DEBUG, "Changed Addr : %s", retAddr->addr); - NS_LOG_V(DEBUG, "Changed Port : %d", retAddr->port); - - return retAddr; -} - bool NSIsExtraValue(const char * name) { if (!strcmp(name, NS_ATTRIBUTE_MESSAGE_ID) || diff --git a/service/notification/src/consumer/NSConsumerCommon.h b/service/notification/src/consumer/NSConsumerCommon.h index 05e87fe..9320625 100644 --- a/service/notification/src/consumer/NSConsumerCommon.h +++ b/service/notification/src/consumer/NSConsumerCommon.h @@ -96,14 +96,6 @@ typedef struct } NSSyncInfo_internal; -#ifdef WITH_MQ -typedef struct -{ - char * serverAddr; - char * topicName; -} NSMQTopicAddress; -#endif - bool NSIsStartedConsumer(); void NSSetIsStartedConsumer(bool setValue); @@ -156,8 +148,6 @@ OCStackResult NSInvokeRequest(OCDoHandle * handle, bool NSOCResultToSuccess(OCStackResult ret); -OCDevAddr * NSChangeAddress(const char * address); - #ifdef __cplusplus } #endif // __cplusplus diff --git a/service/notification/src/consumer/NSConsumerInterface.c b/service/notification/src/consumer/NSConsumerInterface.c index 9eb7bc5..cce720b 100644 --- a/service/notification/src/consumer/NSConsumerInterface.c +++ b/service/notification/src/consumer/NSConsumerInterface.c @@ -27,11 +27,10 @@ #include "NSConsumerCommon.h" #include "NSConstants.h" #include "NSConsumerScheduler.h" +#include "NSUtil.h" #include "oic_malloc.h" #include "oic_string.h" -static char * NSGetQueryAddress(const char * serverFullAddress); - // Public APIs NSResult NSStartConsumer(NSConsumerConfig config) { @@ -241,19 +240,3 @@ NSResult NSConsumerUpdateTopicList(const char * providerId, NSTopicLL * topics) return NSConsumerPushEvent(topicTask); } - -char * NSGetQueryAddress(const char * serverFullAddress) -{ - if (strstr(serverFullAddress, "coap+tcp://")) - { - return OICStrdup(serverFullAddress+11); - } - else if (strstr(serverFullAddress, "coap://")) - { - return OICStrdup(serverFullAddress+7); - } - else - { - return OICStrdup(serverFullAddress); - } -} diff --git a/service/notification/src/consumer/NSConsumerMQPlugin.c b/service/notification/src/consumer/NSConsumerMQPlugin.c index 3012f33..283f10a 100644 --- a/service/notification/src/consumer/NSConsumerMQPlugin.c +++ b/service/notification/src/consumer/NSConsumerMQPlugin.c @@ -22,14 +22,12 @@ #include "NSConstants.h" #include "NSConsumerCommon.h" #include "NSConsumerMQPlugin.h" +#include "NSUtil.h" #include "oic_malloc.h" #include "oic_string.h" #include "ocpayload.h" -#define NS_ATTRIBUTE_MQ_MESSAGE "message" -#define NS_ATTIRBUTE_MQ_TOPICLIST "topiclist" - void NSHandleMQSubscription(NSMQTopicAddress * address); OCStackApplicationResult NSConsumerIntrospectMQTopic( diff --git a/service/notification/src/provider/NSProviderInterface.c b/service/notification/src/provider/NSProviderInterface.c index 7adad28..dad01ba 100644 --- a/service/notification/src/provider/NSProviderInterface.c +++ b/service/notification/src/provider/NSProviderInterface.c @@ -194,6 +194,40 @@ NSResult NSProviderDisableRemoteService(char *serverAddress) return NS_FAIL; } +#ifdef WITH_MQ +NSResult NSProviderSubscribeMQService(const char * serverAddress, const char * topicName) +{ + NS_LOG(DEBUG, "NSProviderSubscribeMQService - IN"); + pthread_mutex_lock(&nsInitMutex); + + if (!initProvider || !serverAddress || !topicName) + { + NS_LOG(DEBUG, "Provider service has not been started yet or set the server " + "address and topicName"); + pthread_mutex_unlock(&nsInitMutex); + return NS_FAIL; + } + + NSMQTopicAddress * topicAddr = (NSMQTopicAddress *)OICMalloc(sizeof(NSMQTopicAddress)); + + if (!topicAddr) + { + NS_LOG(DEBUG, "fail to memory allocate"); + pthread_mutex_unlock(&nsInitMutex); + return NS_FAIL; + } + + topicAddr->serverAddr = NSGetQueryAddress(serverAddress); + topicAddr->topicName = OICStrdup(topicName); + + NSPushQueue(DISCOVERY_SCHEDULER, TASK_MQ_REQ_SUBSCRIBE, (void *) topicAddr); + + pthread_mutex_unlock(&nsInitMutex); + NS_LOG(DEBUG, "NSProviderSubscribeMQService - OUT"); + return NS_OK; +} +#endif + NSResult NSSendMessage(NSMessage * msg) { NS_LOG(DEBUG, "NSSendNotification - IN"); @@ -253,7 +287,7 @@ NSResult NSAcceptSubscription(const char * consumerId, bool accepted) } char * newConsumerId = OICStrdup(consumerId); - if(accepted) + if (accepted) { NS_LOG(DEBUG, "accepted is true - ALLOW"); NSPushQueue(SUBSCRIPTION_SCHEDULER, TASK_SEND_ALLOW, newConsumerId); @@ -387,7 +421,7 @@ NSResult NSProviderUnregisterTopic(const char * topicName) NSPushQueue(TOPIC_SCHEDULER, TASK_UNREGISTER_TOPIC, &topicSyncResult); pthread_cond_wait(topicSyncResult.condition, &nsInitMutex); - if(topicSyncResult.result != NS_OK) + if (topicSyncResult.result != NS_OK) { pthread_mutex_unlock(&nsInitMutex); return NS_FAIL; @@ -407,7 +441,7 @@ NSResult NSProviderSetConsumerTopic(const char * consumerId, const char * topicN NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(sizeof(NSCacheTopicSubData)); - if(!initProvider || !consumerId || consumerId[0] == '\0' || !topicName || topicName[0] == '\0' + if (!initProvider || !consumerId || consumerId[0] == '\0' || !topicName || topicName[0] == '\0' || !NSGetPolicy() || !topicSubData) { NS_LOG(DEBUG, "provider is not started or " @@ -442,7 +476,7 @@ NSResult NSProviderUnsetConsumerTopic(const char * consumerId, const char * topi NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(sizeof(NSCacheTopicSubData)); - if(!initProvider || !consumerId || consumerId[0] == '\0' || !topicName || topicName[0] == '\0' + if (!initProvider || !consumerId || consumerId[0] == '\0' || !topicName || topicName[0] == '\0' || !NSGetPolicy() || !topicSubData) { NS_LOG(DEBUG, "provider is not started or " diff --git a/service/notification/src/provider/NSProviderListener.c b/service/notification/src/provider/NSProviderListener.c index 1f3723f..e6c528f 100644 --- a/service/notification/src/provider/NSProviderListener.c +++ b/service/notification/src/provider/NSProviderListener.c @@ -325,6 +325,170 @@ OCEntityHandlerResult NSEntityHandlerTopicCb(OCEntityHandlerFlag flag, return ehResult; } +#ifdef WITH_MQ +OCStackApplicationResult NSProviderMQListener(void * ctx, OCDoHandle handle, + OCClientResponse * clientResponse) +{ + (void) ctx; + (void) handle; + + if (clientResponse->sequenceNumber == OC_OBSERVE_REGISTER) + { + NS_LOG(DEBUG, "MQ OC_OBSERVE_RIGSTER"); + NSSetMQServerInfo(clientResponse->resourceUri, &(clientResponse->devAddr)); + } + + NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION); + NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION); + NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION); + + NS_LOG(DEBUG, "income observe response of MQ notification"); + NS_LOG_V(DEBUG, "MQ OBS response income : %s:%d", + clientResponse->devAddr.addr, clientResponse->devAddr.port); + NS_LOG_V(DEBUG, "MQ OBS response result : %d", + clientResponse->result); + NS_LOG_V(DEBUG, "MQ OBS response sequenceNum : %d", + clientResponse->sequenceNumber); + NS_LOG_V(DEBUG, "MQ OBS response resource uri : %s", + clientResponse->resourceUri); + NS_LOG_V(DEBUG, "MQ OBS response Transport Type : %d", + clientResponse->devAddr.adapter); + + OCRepPayload * payload = NULL; + OCRepPayloadGetPropObject((OCRepPayload *)clientResponse->payload, NS_ATTRIBUTE_MQ_MESSAGE, + & payload); + NS_VERIFY_NOT_NULL(payload, OC_STACK_KEEP_TRANSACTION); + + char * pId = NULL; + bool getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, &pId); + NS_LOG_V (DEBUG, "provider id: %s", pId); + + if (strcmp(pId, NSGetProviderInfo()->providerId) == 0) + { + NSMessageType type = -1; + getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_TYPE, (int64_t *) &type); + + if (getResult && (type == NS_MESSAGE_READ || type == NS_MESSAGE_DELETED)) + { + NSSyncInfo * syncInfo = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo)); + NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(syncInfo, + OC_STACK_KEEP_TRANSACTION, NSFreeSync(syncInfo)); + + bool getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, + (int64_t *) &(syncInfo->messageId)); + syncInfo->state = (type == NS_MESSAGE_READ) ? NS_SYNC_READ : NS_SYNC_DELETED; + OICStrcpy(syncInfo->providerId, NS_UUID_STRING_SIZE, pId); + + if (getResult) + { + NSPushQueue(NOTIFICATION_SCHEDULER, TASK_RECV_READ, (void*) syncInfo); + } + else + { + NSFreeSync(syncInfo); + } + } + } + + OCRepPayloadDestroy(payload); + return OC_STACK_KEEP_TRANSACTION; +} + +OCStackApplicationResult NSProviderIntrospectMQTopic(void * ctx, OCDoHandle handle, + OCClientResponse * clientResponse) +{ + (void) handle; + + NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION); + NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION); + NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION); + + NS_LOG(DEBUG, "income get response of MQ broker"); + NS_LOG_V(DEBUG, "MQ GET response income : %s:%d", + clientResponse->devAddr.addr, clientResponse->devAddr.port); + NS_LOG_V(DEBUG, "MQ GET response result : %d", + clientResponse->result); + NS_LOG_V(DEBUG, "MQ GET response sequenceNum : %d", + clientResponse->sequenceNumber); + NS_LOG_V(DEBUG, "MQ GET response resource uri : %s", + clientResponse->resourceUri); + NS_LOG_V(DEBUG, "MQ GET response Transport Type : %d", + clientResponse->devAddr.adapter); + + char ** topicList = NULL; + size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0}; + OCRepPayloadGetStringArray((OCRepPayload *) clientResponse->payload, + NS_ATTIRBUTE_MQ_TOPICLIST, & topicList, dimensions); + + char * interestTopicName = (char *) ctx; + for (size_t i = 0; i < dimensions[0]; ++i) + { + NS_LOG_V(DEBUG, "found MQ topic : %s", topicList[i]); + if (!strcmp(topicList[i], interestTopicName)) + { + NS_LOG(DEBUG, "subscribe to MQ notification"); + + OCCallbackData cbdata = { NULL, NULL, NULL }; + cbdata.cb = NSProviderMQListener; + cbdata.context = NULL; + cbdata.cd = NULL; + + OCStackResult ret = OCDoResource(NULL, OC_REST_OBSERVE, topicList[i], + clientResponse->addr, NULL, CT_DEFAULT, OC_LOW_QOS, &cbdata, NULL, 0); + + if (!NSOCResultToSuccess(ret)) + { + NS_LOG(DEBUG, "fail to subscribe to MQ notification"); + continue; + } + } + } + + return OC_STACK_KEEP_TRANSACTION; +} + +OCStackApplicationResult NSProviderPublishTopicCB(void *ctx, OCDoHandle handle, + OCClientResponse *clientResponse) +{ + (void) ctx; + (void) handle; + NS_LOG(DEBUG, "Publish Topic callback received"); + + OCStackApplicationResult res = OC_STACK_ERROR; + + NS_LOG_V(DEBUG, "Publish Topic response received code: (%d)", clientResponse->result); + + if (clientResponse->payload != NULL && + clientResponse->payload->type == PAYLOAD_TYPE_REPRESENTATION) + { + NS_LOG(DEBUG, "PAYLOAD_TYPE_REPRESENTATION received"); + + OCRepPayloadValue *val = ((OCRepPayload *)clientResponse->payload)->values; + while (val) + { + if( val->type == OCREP_PROP_INT) + { + NS_LOG_V(DEBUG, "Key: %s, Value: %lld, int", val->name, val->i); + } + else if( val->type == OCREP_PROP_STRING) + { + NS_LOG_V(DEBUG, "Key: %s, Value: %s, string", val->name, val->str); + } + else + { + NS_LOG_V(DEBUG, "Un supported val Type.(0x%d)", val->type); + } + + val = val->next; + } + + res = OC_STACK_KEEP_TRANSACTION; + } + + return res; +} +#endif + void NSProviderConnectionStateListener(const CAEndpoint_t * info, bool connected) { NS_LOG(DEBUG, "NSProviderConnectionStateListener - IN"); diff --git a/service/notification/src/provider/NSProviderListener.h b/service/notification/src/provider/NSProviderListener.h index e8b8617..1c1e14a 100644 --- a/service/notification/src/provider/NSProviderListener.h +++ b/service/notification/src/provider/NSProviderListener.h @@ -51,4 +51,10 @@ void NSProviderConnectionStateListener(const CAEndpoint_t * info, bool isConnect void NSProviderAdapterStateListener(CATransportAdapter_t adapter, bool enabled); +OCStackApplicationResult NSProviderIntrospectMQTopic(void * ctx, OCDoHandle handle, + OCClientResponse * clientResponse); + +OCStackApplicationResult NSProviderPublishTopicCB(void *ctx, OCDoHandle handle, + OCClientResponse *clientResponse); + #endif /* _NS_PROVIDER_LISTENER__H_ */ diff --git a/service/notification/src/provider/NSProviderNotification.c b/service/notification/src/provider/NSProviderNotification.c index 23721bf..f069d7d 100644 --- a/service/notification/src/provider/NSProviderNotification.c +++ b/service/notification/src/provider/NSProviderNotification.c @@ -76,6 +76,31 @@ NSResult NSSetSyncPayload(NSSyncInfo *sync, OCRepPayload** syncPayload) return NS_OK; } +#ifdef WITH_MQ +OCStackResult NSProviderPublishTopic(OCRepPayload * payload, OCClientResponseHandler response) +{ + OCCallbackData cbData; + memset(&cbData, 0, sizeof(OCCallbackData)); + cbData.cb = response; + cbData.cd = NULL; + cbData.context = NULL; + + OCRepPayload *publishPayload = OCRepPayloadCreate(); + if (!publishPayload) + { + OCRepPayloadDestroy(publishPayload); + return OC_STACK_NO_MEMORY; + } + + OCRepPayloadSetPropObject(publishPayload, NS_ATTRIBUTE_MQ_MESSAGE, payload); + + NSMQServerInfo * serverInfo = NSGetMQServerInfo(); + return OCDoResource(NULL, OC_REST_POST, serverInfo->serverUri, serverInfo->devAddr, + (OCPayload *)publishPayload, + CT_ADAPTER_TCP, OC_LOW_QOS, &cbData, NULL, 0); +} +#endif + NSResult NSSendNotification(NSMessage *msg) { NS_LOG(DEBUG, "NSSendMessage - IN"); @@ -152,6 +177,13 @@ NSResult NSSendNotification(NSMessage *msg) } } #endif + +#ifdef WITH_MQ + if (!NSGetMQServerInfo()) + { + NSProviderPublishTopic(payload, NSProviderPublishTopicCB); + } +#endif } it = it->next; @@ -252,7 +284,6 @@ NSResult NSSendSync(NSSyncInfo *sync) obCount, payload, OC_LOW_QOS); NS_LOG_V(DEBUG, "Sync ocstackResult = %d", ocstackResult); - if (ocstackResult != OC_STACK_OK) { NS_LOG(ERROR, "fail to send Sync"); @@ -311,7 +342,6 @@ void * NSNotificationSchedule(void *ptr) } pthread_mutex_unlock(&NSMutex[NOTIFICATION_SCHEDULER]); - } NS_LOG(INFO, "Destroy NSNotificationSchedule"); diff --git a/service/notification/src/provider/NSProviderSubscription.c b/service/notification/src/provider/NSProviderSubscription.c index 6bfe3cd..8474de3 100644 --- a/service/notification/src/provider/NSProviderSubscription.c +++ b/service/notification/src/provider/NSProviderSubscription.c @@ -345,6 +345,29 @@ NSResult NSSendConsumerSubResponse(OCEntityHandlerRequest * entityHandlerRequest return NS_OK; } +#ifdef WITH_MQ +void NSProviderMQSubscription(NSMQTopicAddress * topicAddr) +{ + char * serverUri = topicAddr->serverAddr; + char * topicName = topicAddr->topicName; + + OCDevAddr * addr = NSChangeAddress(serverUri); + OCCallbackData cbdata = { NULL, NULL, NULL }; + cbdata.cb = NSProviderIntrospectMQTopic; + cbdata.context = topicName; + cbdata.cd = OICFree; + + OCStackResult ret = OCDoResource(NULL, OC_REST_GET, serverUri, addr, + NULL, CT_DEFAULT, OC_LOW_QOS, &cbdata, NULL, 0); + + NSOCResultToSuccess(ret); + + OICFree(topicAddr->serverAddr); + OICFree(topicAddr->topicName); + OICFree(topicAddr); +} +#endif + void * NSSubScriptionSchedule(void *ptr) { if (ptr == NULL) @@ -406,6 +429,12 @@ void * NSSubScriptionSchedule(void *ptr) NSHandleSubscription((OCEntityHandlerRequest*) node->taskData, NS_RESOURCE_SYNC); break; +#ifdef WITH_MQ + case TASK_MQ_REQ_SUBSCRIBE: + NS_LOG(DEBUG, "CASE TASK_MQ_REQ_SUBSCRIBE : "); + NSProviderMQSubscription((NSMQTopicAddress*) node->taskData); + break; +#endif default: break; diff --git a/service/notification/src/provider/NSProviderSystem.c b/service/notification/src/provider/NSProviderSystem.c index 5949350..0018737 100644 --- a/service/notification/src/provider/NSProviderSystem.c +++ b/service/notification/src/provider/NSProviderSystem.c @@ -24,6 +24,10 @@ static char NSRemoteServerAddress[MAX_SERVER_ADDRESS] = {0,}; #endif +#ifdef WITH_MQ +static NSMQServerInfo * mqServerInfo = NULL; +#endif + static NSConnectionState NSProviderConnectionState; NSProviderInfo * providerInfo; @@ -146,3 +150,21 @@ const char * NSGetUserInfo() { return providerInfo->userInfo; } + +#ifdef WITH_MQ +void NSSetMQServerInfo(const char * serverUri, OCDevAddr * devAddr) +{ + if (!mqServerInfo) + { + mqServerInfo = (NSMQServerInfo *)OICMalloc(sizeof(NSMQServerInfo)); + mqServerInfo->serverUri = OICStrdup(serverUri); + mqServerInfo->devAddr = (OCDevAddr *)OICMalloc(sizeof(OCDevAddr)); + memcpy(mqServerInfo->devAddr, devAddr, sizeof(OCDevAddr)); + } +} + +NSMQServerInfo * NSGetMQServerInfo() +{ + return mqServerInfo; +} +#endif diff --git a/service/notification/src/provider/NSProviderSystem.h b/service/notification/src/provider/NSProviderSystem.h index e382ddb..94c37dd 100644 --- a/service/notification/src/provider/NSProviderSystem.h +++ b/service/notification/src/provider/NSProviderSystem.h @@ -41,4 +41,9 @@ void NSSetPolicy(bool policy); bool NSGetResourceSecurity(); void NSSetResourceSecurity(bool secured); +#ifdef WITH_MQ +void NSSetMQServerInfo(const char * serverUri, OCDevAddr * devAddr); +NSMQServerInfo * NSGetMQServerInfo(); +#endif + #endif /* _NS_PROVIDER_SYSTEM__H_ */ -- 2.7.4