[IOT-1523] Add consumer module supported cloud MQ.
authorKIM JungYong <jyong2.kim@samsung.com>
Wed, 26 Oct 2016 06:05:53 +0000 (15:05 +0900)
committerUze Choi <uzchoi@samsung.com>
Thu, 17 Nov 2016 06:16:24 +0000 (06:16 +0000)
https://jira.iotivity.org/browse/IOT-1523
API about subscription to MQ notification is added in consumer service.
added api : NSConsumerSubscribeMQService(...)

For the use of this api, app developer shold find the MQ broker.
And then call the new api with MQ broker address/uri and topic name,
consumer service will receive message from MQ and synchronize with local and MQ message.

Change-Id: If544d33cbcd641ed4d68d1942450acad4c6e2cde
Signed-off-by: KIM JungYong <jyong2.kim@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/13695
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Uze Choi <uzchoi@samsung.com>
12 files changed:
service/notification/cpp-wrapper/examples/linux/notificationserviceprovider.cpp
service/notification/include/NSCommon.h
service/notification/include/NSConsumerInterface.h
service/notification/src/common/NSConstants.h
service/notification/src/consumer/NSConsumerCommon.c
service/notification/src/consumer/NSConsumerCommon.h
service/notification/src/consumer/NSConsumerCommunication.c
service/notification/src/consumer/NSConsumerDiscovery.c
service/notification/src/consumer/NSConsumerInterface.c
service/notification/src/consumer/NSConsumerMQPlugin.c [new file with mode: 0644]
service/notification/src/consumer/NSConsumerMQPlugin.h [new file with mode: 0644]
service/notification/src/consumer/NSConsumerScheduler.c

index 35fae95..2279ce5 100755 (executable)
@@ -21,6 +21,7 @@
 #include <iostream>\r
 #include <stdlib.h>\r
 #include <cstdint>\r
+#include <limits>\r
 #include "NSCommon.h"\r
 #include "NSProviderService.h"\r
 #include "NSUtils.h"\r
index 9f6e35c..ff4f231 100644 (file)
@@ -81,6 +81,8 @@ typedef enum
     NS_MESSAGE_NOTICE = 2,
     NS_MESSAGE_EVENT = 3,
     NS_MESSAGE_INFO = 4,
+    NS_MESSAGE_READ = 11,
+    NS_MESSAGE_DELETED = 12
 
 } NSMessageType;
 
index 42c1dcb..6a073c0 100644 (file)
@@ -82,7 +82,17 @@ NSResult NSStopConsumer();
  * @param[in] server address combined with IP address and port number using delimiter :
  * @return ::NS_OK or result code of NSResult
  */
-NSResult NSConsumerEnableRemoteService(const char *serverAddress);
+NSResult NSConsumerEnableRemoteService(const char * serverAddress);
+
+#ifdef WITH_MQ
+/**
+ * Request to subscribe to remote MQ address as parameter.
+ * @param[in] server address combined with IP address and port number and MQ broker uri using delimiter :
+ * @param[in] topicName the interest MQ Topic name for subscription.
+ * @return ::NS_OK or result code of NSResult
+ */
+NSResult NSConsumerSubscribeMQService(const char * serverAddress, const char * topicName);
+#endif
 
 /**
  * Request discovery manually
index 983f2a3..b86b93d 100644 (file)
@@ -225,6 +225,10 @@ typedef enum eSchedulerType
 
 typedef enum eTaskType
 {
+#ifdef WITH_MQ
+    TASK_MQ_REQ_SUBSCRIBE = 20001,
+#endif
+
     TASK_REGISTER_RESOURCE = 1000,
     TASK_PUBLISH_RESOURCE = 1001,
 
index a790c4b..104f2c6 100644 (file)
 
 #define NS_QUERY_CONSUMER_ID "consumerId"
 
+static bool NSIsExtraValue(const char * name);
+static void NSCopyPayloadValueArray(OCRepPayloadValue* dest, OCRepPayloadValue* source);
+static OCRepPayloadValue * NSCopyPayloadValue(OCRepPayloadValue * value);
+
 pthread_mutex_t ** NSGetStackMutex()
 {
     static pthread_mutex_t * g_stackMutext = NULL;
@@ -249,6 +253,131 @@ NSTask * NSMakeTask(NSTaskType type, void * data)
     return retTask;
 }
 
+static NSMessage * NSCreateMessage_internal(uint64_t id, const char * providerId)
+{
+    NSMessage * retMsg = (NSMessage *)OICMalloc(sizeof(NSMessage));
+    NS_VERIFY_NOT_NULL(retMsg, NULL);
+
+    retMsg->messageId = id;
+    OICStrcpy(retMsg->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, providerId);
+    retMsg->title = NULL;
+    retMsg->contentText = NULL;
+    retMsg->sourceName = NULL;
+    retMsg->topic = NULL;
+    retMsg->type = NS_MESSAGE_INFO;
+    retMsg->dateTime = NULL;
+    retMsg->ttl = 0;
+    retMsg->mediaContents = NULL;
+    retMsg->extraInfo = NULL;
+
+    return retMsg;
+}
+
+static OCRepPayload * NSGetExtraInfo(OCRepPayload * payload)
+{
+    NS_LOG(DEBUG, "get extra info");
+    OCRepPayload * extraInfo = OCRepPayloadCreate();
+    NS_VERIFY_NOT_NULL(extraInfo, NULL);
+    OCRepPayload * origin = OCRepPayloadClone(payload);
+
+    bool isFirstExtra = true;
+    OCRepPayloadValue * headValue = NULL;
+    OCRepPayloadValue * curValue = NULL;
+    OCRepPayloadValue * value = origin->values;
+    while(value)
+    {
+        if (NSIsExtraValue(value->name))
+        {
+            curValue = NSCopyPayloadValue(value);
+            NS_LOG_V(DEBUG, " key : %s", curValue->name);
+            if (isFirstExtra)
+            {
+                headValue = curValue;
+                extraInfo->values = headValue;
+                isFirstExtra = false;
+            }
+            else
+            {
+                headValue->next = curValue;
+                headValue = curValue;
+            }
+            curValue = NULL;
+        }
+        value = value->next;
+    }
+    OCRepPayloadDestroy(origin);
+
+
+    if (!isFirstExtra && extraInfo->values)
+    {
+        return extraInfo;
+    }
+    else
+    {
+        OCRepPayloadDestroy(extraInfo);
+        return NULL;
+    }
+}
+
+NSMessage * NSGetMessage(OCRepPayload * payload)
+{
+    NS_LOG(DEBUG, "get msg id");
+    uint64_t id = NULL;
+    bool getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, (int64_t *)&id);
+    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
+
+    NS_LOG(DEBUG, "get provider id");
+    char * pId = NULL;
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, &pId);
+    NS_LOG_V (DEBUG, "provider id: %s", pId);
+    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
+
+    NS_LOG(DEBUG, "create NSMessage");
+    NSMessage * retMsg = NSCreateMessage_internal(id, pId);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(retMsg, NULL, NSOICFree(pId));
+    NSOICFree(pId);
+
+    NS_LOG(DEBUG, "get msg optional field");
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TITLE, &retMsg->title);
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TEXT, &retMsg->contentText);
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_SOURCE, &retMsg->sourceName);
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TOPIC_NAME, &retMsg->topic);
+
+    OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_TYPE, (int64_t *)&retMsg->type);
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_DATETIME, &retMsg->dateTime);
+    OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_TTL, (int64_t *)&retMsg->ttl);
+
+    char * icon = NULL;
+    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_ICON_IMAGE, &icon);
+
+    if (icon && strlen(icon))
+    {
+        NSMediaContents * contents = (NSMediaContents *)OICMalloc(sizeof(NSMediaContents));
+        if (contents)
+        {
+            contents->iconImage = icon;
+            retMsg->mediaContents = contents;
+        }
+        else
+        {
+            NSOICFree(icon);
+        }
+    }
+
+    retMsg->extraInfo = NSGetExtraInfo(payload);
+
+    NS_LOG_V(DEBUG, "Msg ID      : %lld", (long long int)retMsg->messageId);
+    NS_LOG_V(DEBUG, "Msg Title   : %s", retMsg->title);
+    NS_LOG_V(DEBUG, "Msg Content : %s", retMsg->contentText);
+    NS_LOG_V(DEBUG, "Msg Source  : %s", retMsg->sourceName);
+    NS_LOG_V(DEBUG, "Msg Topic   : %s", retMsg->topic);
+    NS_LOG_V(DEBUG, "Msg Type    : %d", retMsg->type);
+    NS_LOG_V(DEBUG, "Msg Date    : %s", retMsg->dateTime);
+    NS_LOG_V(DEBUG, "Msg ttl     : %lld", (long long int)retMsg->ttl);
+
+    return retMsg;
+}
+
 NSMessage * NSCopyMessage(NSMessage * msg)
 {
     NS_VERIFY_NOT_NULL(msg, NULL);
@@ -320,6 +449,108 @@ void NSRemoveMessage(NSMessage * msg)
     NSOICFree(msg);
 }
 
+void NSGetProviderPostClean(
+        char * pId, char * mUri, char * sUri, char * tUri, NSProviderConnectionInfo * connection)
+{
+    NSOICFree(pId);
+    NSOICFree(mUri);
+    NSOICFree(sUri);
+    NSOICFree(tUri);
+    NSRemoveConnections(connection);
+}
+
+NSProvider_internal * NSGetProvider(OCClientResponse * clientResponse)
+{
+    NS_LOG(DEBUG, "create NSProvider");
+    NS_VERIFY_NOT_NULL(clientResponse->payload, NULL);
+
+    OCRepPayloadPropType accepterType = OCREP_PROP_BOOL;
+
+    OCRepPayload * payload = (OCRepPayload *)clientResponse->payload;
+    OCRepPayloadValue * value = payload->values;
+    while (value)
+    {
+        NS_LOG_V(DEBUG, "Payload Key : %s", value->name);
+        NS_LOG_V(DEBUG, "Payload Type : %d", (int) value->type);
+        if (!strcmp(value->name, NS_ATTRIBUTE_POLICY))
+        {
+            accepterType = value->type;
+        }
+        value = value->next;
+    }
+
+    char * providerId = NULL;
+    char * messageUri = NULL;
+    char * syncUri = NULL;
+    char * topicUri = NULL;
+    bool bAccepter = 0;
+    int64_t iAccepter = 0;
+    NSProviderConnectionInfo * connection = NULL;
+
+    NS_LOG(DEBUG, "get information of accepter");
+    bool getResult = false;
+    if (accepterType == OCREP_PROP_BOOL)
+    {
+        getResult = OCRepPayloadGetPropBool(payload, NS_ATTRIBUTE_POLICY, & bAccepter);
+    }
+    else if (accepterType == OCREP_PROP_INT)
+    {
+        getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_POLICY, & iAccepter);
+    }
+    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
+
+    NS_LOG(DEBUG, "get provider ID");
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, & providerId);
+    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
+
+    NS_LOG(DEBUG, "get message URI");
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_MESSAGE, & messageUri);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
+            NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
+
+    NS_LOG(DEBUG, "get sync URI");
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_SYNC, & syncUri);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
+            NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
+
+    NS_LOG(DEBUG, "get topic URI");
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TOPIC, & topicUri);
+
+    NS_LOG(DEBUG, "get provider connection information");
+    NS_VERIFY_NOT_NULL(clientResponse->addr, NULL);
+    connection = NSCreateProviderConnections(clientResponse->addr);
+    NS_VERIFY_NOT_NULL(connection, NULL);
+
+    NSProvider_internal * newProvider
+        = (NSProvider_internal *)OICMalloc(sizeof(NSProvider_internal));
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(newProvider, NULL,
+          NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
+
+    OICStrcpy(newProvider->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, providerId);
+    NSOICFree(providerId);
+    newProvider->messageUri = messageUri;
+    newProvider->syncUri = syncUri;
+    newProvider->topicUri = NULL;
+    if (topicUri && strlen(topicUri) > 0)
+    {
+        newProvider->topicUri = topicUri;
+    }
+    if (accepterType == OCREP_PROP_BOOL)
+    {
+        newProvider->accessPolicy = (NSSelector)bAccepter;
+    }
+    else if (accepterType == OCREP_PROP_INT)
+    {
+        newProvider->accessPolicy = (NSSelector)iAccepter;
+    }
+
+    newProvider->connection = connection;
+    newProvider->topicLL = NULL;
+    newProvider->state = NS_DISCOVERED;
+
+    return newProvider;
+}
+
 void NSRemoveConnections(NSProviderConnectionInfo * connections)
 {
     NS_VERIFY_NOT_NULL_V(connections);
@@ -596,3 +827,166 @@ bool NSOCResultToSuccess(OCStackResult ret)
             return false;
     }
 }
+
+OCDevAddr * NSChangeAddress(const char * address)
+{
+    NS_VERIFY_NOT_NULL(address, NULL);
+    OCDevAddr * retAddr = NULL;
+
+    int index = 0;
+    while(address[index] != '\0')
+    {
+        if (address[index] == ':')
+        {
+            break;
+        }
+        index++;
+    }
+
+    if (address[index] == '\0')
+    {
+        return NULL;
+    }
+
+    int tmp = index + 1;
+    uint16_t port = address[tmp++] - '0';
+
+    while(true)
+    {
+        if (address[tmp] == '\0' || address[tmp] > '9' || address[tmp] < '0')
+        {
+            break;
+        }
+        port *= 10;
+        port += address[tmp++] - '0';
+    }
+
+    retAddr = (OCDevAddr *) OICMalloc(sizeof(OCDevAddr));
+    NS_VERIFY_NOT_NULL(retAddr, NULL);
+
+    retAddr->adapter = OC_ADAPTER_TCP;
+    OICStrcpy(retAddr->addr, index + 1, address);
+    retAddr->addr[index] = '\0';
+    retAddr->port = port;
+    retAddr->flags = OC_IP_USE_V6;
+
+    NS_LOG(DEBUG, "Change Address for TCP request");
+    NS_LOG_V(DEBUG, "Origin : %s", address);
+    NS_LOG_V(DEBUG, "Changed Addr : %s", retAddr->addr);
+    NS_LOG_V(DEBUG, "Changed Port : %d", retAddr->port);
+
+    return retAddr;
+}
+
+bool NSIsExtraValue(const char * name)
+{
+    if (!strcmp(name, NS_ATTRIBUTE_MESSAGE_ID) ||
+        !strcmp(name, NS_ATTRIBUTE_PROVIDER_ID) ||
+        !strcmp(name, NS_ATTRIBUTE_TITLE) ||
+        !strcmp(name, NS_ATTRIBUTE_TEXT) ||
+        !strcmp(name, NS_ATTRIBUTE_SOURCE) ||
+        !strcmp(name, NS_ATTRIBUTE_TOPIC_NAME) ||
+        !strcmp(name, NS_ATTRIBUTE_TYPE) ||
+        !strcmp(name, NS_ATTRIBUTE_DATETIME) ||
+        !strcmp(name, NS_ATTRIBUTE_TTL) ||
+        !strcmp(name, NS_ATTRIBUTE_ICON_IMAGE))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+
+void NSCopyPayloadValueArray(OCRepPayloadValue* dest, OCRepPayloadValue* source)
+{
+    NS_VERIFY_NOT_NULL_V(source);
+
+    size_t dimTotal = calcDimTotal(source->arr.dimensions);
+    switch(source->arr.type)
+    {
+        case OCREP_PROP_INT:
+            dest->arr.iArray = (int64_t*)OICMalloc(dimTotal * sizeof(int64_t));
+            NS_VERIFY_NOT_NULL_V(dest->arr.iArray);
+            memcpy(dest->arr.iArray, source->arr.iArray, dimTotal * sizeof(int64_t));
+            break;
+        case OCREP_PROP_DOUBLE:
+            dest->arr.dArray = (double*)OICMalloc(dimTotal * sizeof(double));
+            NS_VERIFY_NOT_NULL_V(dest->arr.dArray);
+            memcpy(dest->arr.dArray, source->arr.dArray, dimTotal * sizeof(double));
+            break;
+        case OCREP_PROP_BOOL:
+            dest->arr.bArray = (bool*)OICMalloc(dimTotal * sizeof(bool));
+            NS_VERIFY_NOT_NULL_V(dest->arr.bArray);
+            memcpy(dest->arr.bArray, source->arr.bArray, dimTotal * sizeof(bool));
+            break;
+        case OCREP_PROP_STRING:
+            dest->arr.strArray = (char**)OICMalloc(dimTotal * sizeof(char*));
+            NS_VERIFY_NOT_NULL_V(dest->arr.strArray);
+            for(size_t i = 0; i < dimTotal; ++i)
+            {
+                dest->arr.strArray[i] = OICStrdup(source->arr.strArray[i]);
+            }
+            break;
+        case OCREP_PROP_OBJECT:
+            dest->arr.objArray = (OCRepPayload**)OICMalloc(dimTotal * sizeof(OCRepPayload*));
+            NS_VERIFY_NOT_NULL_V(dest->arr.objArray);
+            for(size_t i = 0; i < dimTotal; ++i)
+            {
+                dest->arr.objArray[i] = OCRepPayloadClone(source->arr.objArray[i]);
+            }
+            break;
+        case OCREP_PROP_ARRAY:
+            dest->arr.objArray = (OCRepPayload**)OICMalloc(dimTotal * sizeof(OCRepPayload*));
+            NS_VERIFY_NOT_NULL_V(dest->arr.objArray);
+            for(size_t i = 0; i < dimTotal; ++i)
+            {
+                dest->arr.objArray[i] = OCRepPayloadClone(source->arr.objArray[i]);
+            }
+            break;
+        case OCREP_PROP_BYTE_STRING:
+            dest->arr.ocByteStrArray = (OCByteString*)OICMalloc(dimTotal * sizeof(OCByteString));
+            NS_VERIFY_NOT_NULL_V(dest->arr.ocByteStrArray);
+            for (size_t i = 0; i < dimTotal; ++i)
+            {
+                OCByteStringCopy(&dest->arr.ocByteStrArray[i], &source->arr.ocByteStrArray[i]);
+                NS_VERIFY_NOT_NULL_V(dest->arr.ocByteStrArray[i].bytes);
+            }
+            break;
+        default:
+            break;
+    }
+}
+
+OCRepPayloadValue * NSCopyPayloadValue(OCRepPayloadValue * value)
+{
+    OCRepPayloadValue * retValue = (OCRepPayloadValue *)OICMalloc(sizeof(OCRepPayloadValue));
+    NS_VERIFY_NOT_NULL(retValue, NULL);
+
+    * retValue = * value;
+    retValue->next = NULL;
+    retValue->name = OICStrdup(value->name);
+
+    switch(value->type)
+    {
+        case OCREP_PROP_STRING:
+            retValue->str = OICStrdup(value->str);
+            break;
+        case OCREP_PROP_BYTE_STRING:
+            retValue->ocByteStr.bytes = (uint8_t * )OICMalloc(value->ocByteStr.len * sizeof(uint8_t));
+            NS_VERIFY_NOT_NULL(retValue->ocByteStr.bytes, NULL);
+            retValue->ocByteStr.len = value->ocByteStr.len;
+            memcpy(retValue->ocByteStr.bytes, value->ocByteStr.bytes, retValue->ocByteStr.len);
+            break;
+        case OCREP_PROP_OBJECT:
+            retValue->obj = OCRepPayloadClone(value->obj);
+            break;
+        case OCREP_PROP_ARRAY:
+            NSCopyPayloadValueArray(retValue, value);
+            break;
+        default:
+            break;
+    }
+
+    return retValue;
+}
index 121766d..05e87fe 100644 (file)
@@ -96,6 +96,14 @@ typedef struct
 
 } NSSyncInfo_internal;
 
+#ifdef WITH_MQ
+typedef struct
+{
+    char * serverAddr;
+    char * topicName;
+} NSMQTopicAddress;
+#endif
+
 bool NSIsStartedConsumer();
 void NSSetIsStartedConsumer(bool setValue);
 
@@ -114,12 +122,16 @@ void NSSetConsumerId(char * cId);
 char * NSMakeRequestUriWithConsumerId(const char * uri);
 
 NSTask * NSMakeTask(NSTaskType, void *);
-
 NSResult NSConsumerPushEvent(NSTask *);
 
+NSMessage * NSGetMessage(OCRepPayload * payload);
 NSMessage * NSCopyMessage(NSMessage *);
 void NSRemoveMessage(NSMessage *);
 
+void NSGetProviderPostClean(
+        char * pId, char * mUri, char * sUri, char * tUri, NSProviderConnectionInfo * connection);
+
+NSProvider_internal * NSGetProvider(OCClientResponse * clientResponse);
 NSProviderConnectionInfo * NSCreateProviderConnections(OCDevAddr *);
 NSProviderConnectionInfo * NSCopyProviderConnections(NSProviderConnectionInfo *);
 void NSRemoveConnections(NSProviderConnectionInfo *);
@@ -144,6 +156,8 @@ OCStackResult NSInvokeRequest(OCDoHandle * handle,
 
 bool NSOCResultToSuccess(OCStackResult ret);
 
+OCDevAddr * NSChangeAddress(const char * address);
+
 #ifdef __cplusplus
 }
 #endif // __cplusplus
index 4d4b62a..11e4182 100644 (file)
 
 #define NS_SYNC_URI "/notification/sync"
 
-NSMessage * NSCreateMessage_internal(uint64_t msgId, const char * providerId);
 NSSyncInfo * NSCreateSyncInfo_consumer(uint64_t msgId, const char * providerId, NSSyncType state);
 
-NSMessage * NSGetMessage(OCClientResponse * clientResponse);
 NSSyncInfo * NSGetSyncInfoc(OCClientResponse * clientResponse);
 NSTopicLL * NSGetTopicLL(OCClientResponse * clientResponse);
 
@@ -184,7 +182,7 @@ OCStackApplicationResult NSConsumerMessageListener(
     NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
 
     NS_LOG(DEBUG, "build NSMessage");
-    NSMessage * newNoti = NSGetMessage(clientResponse);
+    NSMessage * newNoti = NSGetMessage((OCRepPayload *) clientResponse->payload);
     NS_VERIFY_NOT_NULL(newNoti, OC_STACK_KEEP_TRANSACTION);
 
     NSTaskType type = TASK_CONSUMER_RECV_MESSAGE;
@@ -219,226 +217,6 @@ void NSGetMessagePostClean(char * pId, OCDevAddr * addr)
     NSOICFree(addr);
 }
 
-bool NSIsExtraValue(const char * name)
-{
-    if (!strcmp(name, NS_ATTRIBUTE_MESSAGE_ID) ||
-        !strcmp(name, NS_ATTRIBUTE_PROVIDER_ID) ||
-        !strcmp(name, NS_ATTRIBUTE_TITLE) ||
-        !strcmp(name, NS_ATTRIBUTE_TEXT) ||
-        !strcmp(name, NS_ATTRIBUTE_SOURCE) ||
-        !strcmp(name, NS_ATTRIBUTE_TOPIC_NAME) ||
-        !strcmp(name, NS_ATTRIBUTE_TYPE) ||
-        !strcmp(name, NS_ATTRIBUTE_DATETIME) ||
-        !strcmp(name, NS_ATTRIBUTE_TTL) ||
-        !strcmp(name, NS_ATTRIBUTE_ICON_IMAGE))
-    {
-        return false;
-    }
-
-    return true;
-}
-
-void NSCopyPayloadValueArray(OCRepPayloadValue* dest, OCRepPayloadValue* source)
-{
-    NS_VERIFY_NOT_NULL_V(source);
-
-    size_t dimTotal = calcDimTotal(source->arr.dimensions);
-    switch(source->arr.type)
-    {
-        case OCREP_PROP_INT:
-            dest->arr.iArray = (int64_t*)OICMalloc(dimTotal * sizeof(int64_t));
-            NS_VERIFY_NOT_NULL_V(dest->arr.iArray);
-            memcpy(dest->arr.iArray, source->arr.iArray, dimTotal * sizeof(int64_t));
-            break;
-        case OCREP_PROP_DOUBLE:
-            dest->arr.dArray = (double*)OICMalloc(dimTotal * sizeof(double));
-            NS_VERIFY_NOT_NULL_V(dest->arr.dArray);
-            memcpy(dest->arr.dArray, source->arr.dArray, dimTotal * sizeof(double));
-            break;
-        case OCREP_PROP_BOOL:
-            dest->arr.bArray = (bool*)OICMalloc(dimTotal * sizeof(bool));
-            NS_VERIFY_NOT_NULL_V(dest->arr.bArray);
-            memcpy(dest->arr.bArray, source->arr.bArray, dimTotal * sizeof(bool));
-            break;
-        case OCREP_PROP_STRING:
-            dest->arr.strArray = (char**)OICMalloc(dimTotal * sizeof(char*));
-            NS_VERIFY_NOT_NULL_V(dest->arr.strArray);
-            for(size_t i = 0; i < dimTotal; ++i)
-            {
-                dest->arr.strArray[i] = OICStrdup(source->arr.strArray[i]);
-            }
-            break;
-        case OCREP_PROP_OBJECT:
-            dest->arr.objArray = (OCRepPayload**)OICMalloc(dimTotal * sizeof(OCRepPayload*));
-            NS_VERIFY_NOT_NULL_V(dest->arr.objArray);
-            for(size_t i = 0; i < dimTotal; ++i)
-            {
-                dest->arr.objArray[i] = OCRepPayloadClone(source->arr.objArray[i]);
-            }
-            break;
-        case OCREP_PROP_ARRAY:
-            dest->arr.objArray = (OCRepPayload**)OICMalloc(dimTotal * sizeof(OCRepPayload*));
-            NS_VERIFY_NOT_NULL_V(dest->arr.objArray);
-            for(size_t i = 0; i < dimTotal; ++i)
-            {
-                dest->arr.objArray[i] = OCRepPayloadClone(source->arr.objArray[i]);
-            }
-            break;
-        case OCREP_PROP_BYTE_STRING:
-            dest->arr.ocByteStrArray = (OCByteString*)OICMalloc(dimTotal * sizeof(OCByteString));
-            NS_VERIFY_NOT_NULL_V(dest->arr.ocByteStrArray);
-            for (size_t i = 0; i < dimTotal; ++i)
-            {
-                OCByteStringCopy(&dest->arr.ocByteStrArray[i], &source->arr.ocByteStrArray[i]);
-                NS_VERIFY_NOT_NULL_V(dest->arr.ocByteStrArray[i].bytes);
-            }
-            break;
-        default:
-            break;
-    }
-}
-
-OCRepPayloadValue * NSCopyPayloadValue(OCRepPayloadValue * value)
-{
-    OCRepPayloadValue * retValue = (OCRepPayloadValue *)OICMalloc(sizeof(OCRepPayloadValue));
-    NS_VERIFY_NOT_NULL(retValue, NULL);
-
-    * retValue = * value;
-    retValue->next = NULL;
-    retValue->name = OICStrdup(value->name);
-
-    switch(value->type)
-    {
-        case OCREP_PROP_STRING:
-            retValue->str = OICStrdup(value->str);
-            break;
-        case OCREP_PROP_BYTE_STRING:
-            retValue->ocByteStr.bytes = (uint8_t * )OICMalloc(value->ocByteStr.len * sizeof(uint8_t));
-            NS_VERIFY_NOT_NULL(retValue->ocByteStr.bytes, NULL);
-            retValue->ocByteStr.len = value->ocByteStr.len;
-            memcpy(retValue->ocByteStr.bytes, value->ocByteStr.bytes, retValue->ocByteStr.len);
-            break;
-        case OCREP_PROP_OBJECT:
-            retValue->obj = OCRepPayloadClone(value->obj);
-            break;
-        case OCREP_PROP_ARRAY:
-            NSCopyPayloadValueArray(retValue, value);
-            break;
-        default:
-            break;
-    }
-
-    return retValue;
-}
-
-OCRepPayload * NSGetExtraInfo(OCRepPayload * payload)
-{
-    NS_LOG(DEBUG, "get extra info");
-    OCRepPayload * extraInfo = OCRepPayloadCreate();
-    NS_VERIFY_NOT_NULL(extraInfo, NULL);
-    OCRepPayload * origin = OCRepPayloadClone(payload);
-
-    bool isFirstExtra = true;
-    OCRepPayloadValue * headValue = NULL;
-    OCRepPayloadValue * curValue = NULL;
-    OCRepPayloadValue * value = origin->values;
-    while(value)
-    {
-        if (NSIsExtraValue(value->name))
-        {
-            curValue = NSCopyPayloadValue(value);
-            NS_LOG_V(DEBUG, " key : %s", curValue->name);
-            if (isFirstExtra)
-            {
-                headValue = curValue;
-                extraInfo->values = headValue;
-                isFirstExtra = false;
-            }
-            else
-            {
-                headValue->next = curValue;
-                headValue = curValue;
-            }
-            curValue = NULL;
-        }
-        value = value->next;
-    }
-    OCRepPayloadDestroy(origin);
-
-
-    if (!isFirstExtra && extraInfo->values)
-    {
-        return extraInfo;
-    }
-    else
-    {
-        OCRepPayloadDestroy(extraInfo);
-        return NULL;
-    }
-}
-
-NSMessage * NSGetMessage(OCClientResponse * clientResponse)
-{
-    NS_VERIFY_NOT_NULL(clientResponse->payload, NULL);
-    OCRepPayload * payload = (OCRepPayload *)clientResponse->payload;
-
-    NS_LOG(DEBUG, "get msg id");
-    uint64_t id = NULL;
-    bool getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, (int64_t *)&id);
-    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
-
-    NS_LOG(DEBUG, "get provider id");
-    char * pId = NULL;
-    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, &pId);
-    NS_LOG_V (DEBUG, "provider id: %s", pId);
-    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
-
-    NS_LOG(DEBUG, "create NSMessage");
-    NSMessage * retMsg = NSCreateMessage_internal(id, pId);
-    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(retMsg, NULL, NSOICFree(pId));
-    NSOICFree(pId);
-
-    NS_LOG(DEBUG, "get msg optional field");
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TITLE, &retMsg->title);
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TEXT, &retMsg->contentText);
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_SOURCE, &retMsg->sourceName);
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TOPIC_NAME, &retMsg->topic);
-
-    OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_TYPE, (int64_t *)&retMsg->type);
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_DATETIME, &retMsg->dateTime);
-    OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_TTL, (int64_t *)&retMsg->ttl);
-
-    char * icon = NULL;
-    OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_ICON_IMAGE, &icon);
-
-    if (icon && strlen(icon))
-    {
-        NSMediaContents * contents = (NSMediaContents *)OICMalloc(sizeof(NSMediaContents));
-        if (contents)
-        {
-            contents->iconImage = icon;
-            retMsg->mediaContents = contents;
-        }
-        else
-        {
-            NSOICFree(icon);
-        }
-    }
-
-    retMsg->extraInfo = NSGetExtraInfo(payload);
-
-    NS_LOG_V(DEBUG, "Msg ID      : %lld", (long long int)retMsg->messageId);
-    NS_LOG_V(DEBUG, "Msg Title   : %s", retMsg->title);
-    NS_LOG_V(DEBUG, "Msg Content : %s", retMsg->contentText);
-    NS_LOG_V(DEBUG, "Msg Source  : %s", retMsg->sourceName);
-    NS_LOG_V(DEBUG, "Msg Topic   : %s", retMsg->topic);
-    NS_LOG_V(DEBUG, "Msg Type    : %d", retMsg->type);
-    NS_LOG_V(DEBUG, "Msg Date    : %s", retMsg->dateTime);
-    NS_LOG_V(DEBUG, "Msg ttl     : %lld", (long long int)retMsg->ttl);
-
-    return retMsg;
-}
-
 NSSyncInfo * NSGetSyncInfoc(OCClientResponse * clientResponse)
 {
     NS_VERIFY_NOT_NULL(clientResponse->payload, NULL);
@@ -473,26 +251,6 @@ NSSyncInfo * NSGetSyncInfoc(OCClientResponse * clientResponse)
     return retSync;
 }
 
-NSMessage * NSCreateMessage_internal(uint64_t id, const char * providerId)
-{
-    NSMessage * retMsg = (NSMessage *)OICMalloc(sizeof(NSMessage));
-    NS_VERIFY_NOT_NULL(retMsg, NULL);
-
-    retMsg->messageId = id;
-    OICStrcpy(retMsg->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, providerId);
-    retMsg->title = NULL;
-    retMsg->contentText = NULL;
-    retMsg->sourceName = NULL;
-    retMsg->topic = NULL;
-    retMsg->type = NS_MESSAGE_INFO;
-    retMsg->dateTime = NULL;
-    retMsg->ttl = 0;
-    retMsg->mediaContents = NULL;
-    retMsg->extraInfo = NULL;
-
-    return retMsg;
-}
-
 NSSyncInfo * NSCreateSyncInfo_consumer(uint64_t msgId, const char * providerId, NSSyncType state)
 {
     NS_VERIFY_NOT_NULL(providerId, NULL);
index 86ce82d..ae1eb29 100644 (file)
 #define NS_DISCOVER_QUERY "/oic/res?rt=oic.wk.notification"
 #define NS_PRESENCE_SUBSCRIBE_QUERY_TCP "/oic/ad?rt=oic.wk.notification"
 
-NSProvider_internal * NSGetProvider(OCClientResponse * clientResponse);
-
-OCDevAddr * NSChangeAddress(const char * address);
-
 OCStackApplicationResult NSConsumerPresenceListener(
         void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
 {
@@ -181,154 +177,6 @@ OCStackApplicationResult NSIntrospectProvider(
     return OC_STACK_KEEP_TRANSACTION;
 }
 
-void NSGetProviderPostClean(
-        char * pId, char * mUri, char * sUri, char * tUri, NSProviderConnectionInfo * connection)
-{
-    NSOICFree(pId);
-    NSOICFree(mUri);
-    NSOICFree(sUri);
-    NSOICFree(tUri);
-    NSRemoveConnections(connection);
-}
-
-NSProvider_internal * NSGetProvider(OCClientResponse * clientResponse)
-{
-    NS_LOG(DEBUG, "create NSProvider");
-    NS_VERIFY_NOT_NULL(clientResponse->payload, NULL);
-
-    OCRepPayloadPropType accepterType = OCREP_PROP_BOOL;
-
-    OCRepPayload * payload = (OCRepPayload *)clientResponse->payload;
-    OCRepPayloadValue * value = payload->values;
-    while (value)
-    {
-        NS_LOG_V(DEBUG, "Payload Key : %s", value->name);
-        NS_LOG_V(DEBUG, "Payload Type : %d", (int) value->type);
-        if (!strcmp(value->name, NS_ATTRIBUTE_POLICY))
-        {
-            accepterType = value->type;
-        }
-        value = value->next;
-    }
-
-    char * providerId = NULL;
-    char * messageUri = NULL;
-    char * syncUri = NULL;
-    char * topicUri = NULL;
-    bool bAccepter = 0;
-    int64_t iAccepter = 0;
-    NSProviderConnectionInfo * connection = NULL;
-
-    NS_LOG(DEBUG, "get information of accepter");
-    bool getResult = false;
-    if (accepterType == OCREP_PROP_BOOL)
-    {
-        getResult = OCRepPayloadGetPropBool(payload, NS_ATTRIBUTE_POLICY, & bAccepter);
-    }
-    else if (accepterType == OCREP_PROP_INT)
-    {
-        getResult = OCRepPayloadGetPropInt(payload, NS_ATTRIBUTE_POLICY, & iAccepter);
-    }
-    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
-
-    NS_LOG(DEBUG, "get provider ID");
-    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, & providerId);
-    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
-
-    NS_LOG(DEBUG, "get message URI");
-    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_MESSAGE, & messageUri);
-    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
-            NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
-
-    NS_LOG(DEBUG, "get sync URI");
-    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_SYNC, & syncUri);
-    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
-            NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
-
-    NS_LOG(DEBUG, "get topic URI");
-    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TOPIC, & topicUri);
-
-    NS_LOG(DEBUG, "get provider connection information");
-    NS_VERIFY_NOT_NULL(clientResponse->addr, NULL);
-    connection = NSCreateProviderConnections(clientResponse->addr);
-    NS_VERIFY_NOT_NULL(connection, NULL);
-
-    NSProvider_internal * newProvider
-        = (NSProvider_internal *)OICMalloc(sizeof(NSProvider_internal));
-    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(newProvider, NULL,
-          NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
-
-    OICStrcpy(newProvider->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, providerId);
-    NSOICFree(providerId);
-    newProvider->messageUri = messageUri;
-    newProvider->syncUri = syncUri;
-    newProvider->topicUri = NULL;
-    if (topicUri && strlen(topicUri) > 0)
-    {
-        newProvider->topicUri = topicUri;
-    }
-    if (accepterType == OCREP_PROP_BOOL)
-    {
-        newProvider->accessPolicy = (NSSelector)bAccepter;
-    }
-    else if (accepterType == OCREP_PROP_INT)
-    {
-        newProvider->accessPolicy = (NSSelector)iAccepter;
-    }
-
-    newProvider->connection = connection;
-    newProvider->topicLL = NULL;
-    newProvider->state = NS_DISCOVERED;
-
-    return newProvider;
-}
-
-OCDevAddr * NSChangeAddress(const char * address)
-{
-    NS_VERIFY_NOT_NULL(address, NULL);
-    OCDevAddr * retAddr = NULL;
-
-    int index = 0;
-    while(address[index] != '\0')
-    {
-        if (address[index] == ':')
-        {
-            break;
-        }
-        index++;
-    }
-
-    if (address[index] == '\0')
-    {
-        return NULL;
-    }
-
-    int tmp = index + 1;
-    uint16_t port = address[tmp++] - '0';
-
-    while(address[tmp] != '\0')
-    {
-        port *= 10;
-        port += address[tmp++] - '0';
-    }
-
-    retAddr = (OCDevAddr *) OICMalloc(sizeof(OCDevAddr));
-    NS_VERIFY_NOT_NULL(retAddr, NULL);
-
-    retAddr->adapter = OC_ADAPTER_TCP;
-    OICStrcpy(retAddr->addr, index + 1, address);
-    retAddr->addr[index] = '\0';
-    retAddr->port = port;
-    retAddr->flags = OC_IP_USE_V6;
-
-    NS_LOG(DEBUG, "Change Address for TCP request");
-    NS_LOG_V(DEBUG, "Origin : %s", address);
-    NS_LOG_V(DEBUG, "Changed Addr : %s", retAddr->addr);
-    NS_LOG_V(DEBUG, "Changed Port : %d", retAddr->port);
-
-    return retAddr;
-}
-
 void NSConsumerHandleRequestDiscover(OCDevAddr * address, NSConsumerDiscoverType rType)
 {
     OCConnectivityType type = CT_ADAPTER_IP;
index db95e81..9eb7bc5 100644 (file)
@@ -30,6 +30,8 @@
 #include "oic_malloc.h"
 #include "oic_string.h"
 
+static char * NSGetQueryAddress(const char * serverFullAddress);
+
 // Public APIs
 NSResult NSStartConsumer(NSConsumerConfig config)
 {
@@ -67,25 +69,42 @@ NSResult NSStopConsumer()
     return NS_OK;
 }
 
-NSResult NSConsumerEnableRemoteService(const char *serverAddress)
+#ifdef WITH_MQ
+NSResult NSConsumerSubscribeMQService(const char * serverAddress, const char * topicName)
 {
     NS_VERIFY_NOT_NULL(serverAddress, NS_ERROR);
+    NS_VERIFY_NOT_NULL(topicName, NS_ERROR);
     bool isStartedConsumer = NSIsStartedConsumer();
     NS_VERIFY_NOT_NULL(isStartedConsumer == true ? (void *) 1 : NULL, NS_ERROR);
 
-    char * queryAddr = NULL;
-    if (strstr(serverAddress, "coap+tcp://"))
-    {
-        queryAddr = OICStrdup(serverAddress+11);
-    }
-    else if (strstr(serverAddress, "coap://"))
-    {
-        queryAddr = OICStrdup(serverAddress+7);
-    }
-    else
-    {
-        queryAddr = OICStrdup(serverAddress);
-    }
+    char * queryAddr = NSGetQueryAddress(serverAddress);
+    NS_VERIFY_NOT_NULL(queryAddr, NS_ERROR);
+
+    NSMQTopicAddress * topicAddr = (NSMQTopicAddress *)OICMalloc(sizeof(NSMQTopicAddress));
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(queryAddr, NS_ERROR, NSOICFree(queryAddr));
+
+    topicAddr->serverAddr = queryAddr;
+    topicAddr->topicName = OICStrdup(topicName);
+
+    NSTask * subMQTask = NSMakeTask(TASK_MQ_REQ_SUBSCRIBE, (void *)topicAddr);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(subMQTask, NS_ERROR,
+                  {
+                      NSOICFree(topicAddr->serverAddr);
+                      NSOICFree(topicAddr->topicName)
+                      NSOICFree(topicAddr);
+                  });
+
+    return NSConsumerPushEvent(subMQTask);
+}
+#endif
+
+NSResult NSConsumerEnableRemoteService(const char * serverAddress)
+{
+    NS_VERIFY_NOT_NULL(serverAddress, NS_ERROR);
+    bool isStartedConsumer = NSIsStartedConsumer();
+    NS_VERIFY_NOT_NULL(isStartedConsumer == true ? (void *) 1 : NULL, NS_ERROR);
+
+    char * queryAddr = NSGetQueryAddress(serverAddress);
     NS_VERIFY_NOT_NULL(queryAddr, NS_ERROR);
 
     NSTask * discoverTask = NSMakeTask(TASK_CONSUMER_REQ_DISCOVER, (void *)queryAddr);
@@ -222,3 +241,19 @@ NSResult NSConsumerUpdateTopicList(const char * providerId, NSTopicLL * topics)
 
     return NSConsumerPushEvent(topicTask);
 }
+
+char * NSGetQueryAddress(const char * serverFullAddress)
+{
+    if (strstr(serverFullAddress, "coap+tcp://"))
+    {
+        return OICStrdup(serverFullAddress+11);
+    }
+    else if (strstr(serverFullAddress, "coap://"))
+    {
+        return OICStrdup(serverFullAddress+7);
+    }
+    else
+    {
+        return OICStrdup(serverFullAddress);
+    }
+}
diff --git a/service/notification/src/consumer/NSConsumerMQPlugin.c b/service/notification/src/consumer/NSConsumerMQPlugin.c
new file mode 100644 (file)
index 0000000..3012f33
--- /dev/null
@@ -0,0 +1,208 @@
+//******************************************************************
+//
+// Copyright 2016 Samsung Electronics All Rights Reserved.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+#ifdef WITH_MQ
+#include "NSConstants.h"
+#include "NSConsumerCommon.h"
+#include "NSConsumerMQPlugin.h"
+
+#include "oic_malloc.h"
+#include "oic_string.h"
+#include "ocpayload.h"
+
+#define NS_ATTRIBUTE_MQ_MESSAGE "message"
+#define NS_ATTIRBUTE_MQ_TOPICLIST "topiclist"
+
+void NSHandleMQSubscription(NSMQTopicAddress * address);
+
+OCStackApplicationResult NSConsumerIntrospectMQTopic(
+        void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
+
+OCStackApplicationResult NSConsumerMQListener(
+        void * ctx, OCDoHandle handle, OCClientResponse * clientResponse);
+
+void NSConsumerMQTaskProcessing(NSTask * task)
+{
+    NS_VERIFY_NOT_NULL_V(task);
+
+    NS_LOG_V(DEBUG, "Receive Event : %d", (int) task->taskType);
+
+    switch (task->taskType)
+    {
+        case TASK_MQ_REQ_SUBSCRIBE:
+        {
+            NSMQTopicAddress * mqTopic = task->taskData;
+            NSHandleMQSubscription(mqTopic);
+            NSOICFree(mqTopic);
+            break;
+        }
+        default:
+        {
+            NS_LOG(ERROR, "Unknown type of task");
+            break;
+        }
+    }
+
+    NSOICFree(task);
+}
+
+void NSHandleMQSubscription(NSMQTopicAddress * topicAddr)
+{
+    char * serverUri = topicAddr->serverAddr;
+    char * topicName = topicAddr->topicName;
+
+    OCDevAddr * addr = NSChangeAddress(serverUri);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(addr,
+                    {
+                        NSOICFree(topicAddr->serverAddr);
+                        NSOICFree(topicAddr->topicName);
+                    });
+    OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, addr, serverUri, NULL,
+                      NSConsumerIntrospectMQTopic, topicName, OICFree, CT_DEFAULT);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(NSOCResultToSuccess(ret) == true ? (void *)1 : NULL,
+                   {
+                       NSOICFree(serverUri);
+                       NSOICFree(topicName);
+                   });
+
+    NSOICFree(serverUri);
+}
+
+OCStackApplicationResult NSConsumerIntrospectMQTopic(
+        void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
+{
+    (void) handle;
+
+    if (!NSIsStartedConsumer())
+    {
+        return OC_STACK_DELETE_TRANSACTION;
+    }
+
+    NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
+    NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
+    NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
+
+    NS_LOG(DEBUG, "income get response of MQ broker");
+    NS_LOG_V(DEBUG, "MQ GET response income : %s:%d",
+            clientResponse->devAddr.addr, clientResponse->devAddr.port);
+    NS_LOG_V(DEBUG, "MQ GET response result : %d",
+            clientResponse->result);
+    NS_LOG_V(DEBUG, "MQ GET response sequenceNum : %d",
+            clientResponse->sequenceNumber);
+    NS_LOG_V(DEBUG, "MQ GET response resource uri : %s",
+            clientResponse->resourceUri);
+    NS_LOG_V(DEBUG, "MQ GET response Transport Type : %d",
+                    clientResponse->devAddr.adapter);
+
+    char ** topicList = NULL;
+    size_t dimensions[MAX_REP_ARRAY_DEPTH] = {0};
+    OCRepPayloadGetStringArray((OCRepPayload *) clientResponse->payload,
+                               NS_ATTIRBUTE_MQ_TOPICLIST, & topicList, dimensions);
+
+    char * interestTopicName = (char *) ctx;
+    for (size_t i = 0; i < dimensions[0]; ++i)
+    {
+        NS_LOG_V(DEBUG, "found MQ topic : %s", topicList[i]);
+        if (!strcmp(topicList[i], interestTopicName))
+        {
+            NS_LOG(DEBUG, "subscribe to MQ notification");
+
+            OCStackResult ret = NSInvokeRequest(NULL,
+                                  OC_REST_OBSERVE, clientResponse->addr, topicList[i], NULL,
+                                  NSConsumerMQListener, NULL, NULL, CT_DEFAULT);
+
+            if (!NSOCResultToSuccess(ret))
+            {
+                NS_LOG(DEBUG, "fail to subscribe to MQ notification");
+                continue;
+            }
+        }
+    }
+
+
+    return OC_STACK_KEEP_TRANSACTION;
+}
+
+OCStackApplicationResult NSConsumerMQListener(
+        void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
+{
+    (void) ctx;
+    (void) handle;
+
+    if (!NSIsStartedConsumer())
+    {
+        return OC_STACK_DELETE_TRANSACTION;
+    }
+
+    NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
+    NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
+    NS_VERIFY_NOT_NULL(clientResponse->payload, OC_STACK_KEEP_TRANSACTION);
+
+    NS_LOG(DEBUG, "income observe response of MQ notification");
+    NS_LOG_V(DEBUG, "MQ OBS response income : %s:%d",
+            clientResponse->devAddr.addr, clientResponse->devAddr.port);
+    NS_LOG_V(DEBUG, "MQ OBS response result : %d",
+            clientResponse->result);
+    NS_LOG_V(DEBUG, "MQ OBS response sequenceNum : %d",
+            clientResponse->sequenceNumber);
+    NS_LOG_V(DEBUG, "MQ OBS response resource uri : %s",
+            clientResponse->resourceUri);
+    NS_LOG_V(DEBUG, "MQ OBS response Transport Type : %d",
+                    clientResponse->devAddr.adapter);
+
+    OCRepPayload * payload = NULL;
+    OCRepPayloadGetPropObject((OCRepPayload *)clientResponse->payload, NS_ATTRIBUTE_MQ_MESSAGE,
+                              & payload);
+    NS_VERIFY_NOT_NULL(payload, OC_STACK_KEEP_TRANSACTION);
+
+    NSMessage * newMsg = NSGetMessage(payload);
+    NS_VERIFY_NOT_NULL(newMsg, OC_STACK_KEEP_TRANSACTION);
+
+    NSTask * task = NULL;
+
+    if (newMsg->type == NS_MESSAGE_READ || newMsg->type == NS_MESSAGE_DELETED)
+    {
+        NSSyncInfo * syncInfo = (NSSyncInfo *)OICMalloc(sizeof(NSSyncInfo));
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(syncInfo,
+                              OC_STACK_KEEP_TRANSACTION, NSRemoveMessage(newMsg));
+
+        syncInfo->messageId = newMsg->messageId;
+        syncInfo->state = (newMsg->type == NS_MESSAGE_READ) ? NS_SYNC_READ : NS_SYNC_DELETED;
+        OICStrcpy(syncInfo->providerId, sizeof(char) * NS_DEVICE_ID_LENGTH, newMsg->providerId);
+
+        NSRemoveMessage(newMsg);
+
+        NS_LOG(DEBUG, "build NSTask for MQ message sync");
+        task = NSMakeTask(TASK_RECV_SYNCINFO, (void *) syncInfo);
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION, NSOICFree(syncInfo));
+    }
+    else
+    {
+        NS_LOG(DEBUG, "build NSTask for MQ message receive");
+        task = NSMakeTask(TASK_CONSUMER_RECV_MESSAGE, (void *) newMsg);
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, OC_STACK_KEEP_TRANSACTION,
+                              NSRemoveMessage(newMsg));
+    }
+
+    NSConsumerPushEvent(task);
+
+    return OC_STACK_KEEP_TRANSACTION;
+}
+#endif
diff --git a/service/notification/src/consumer/NSConsumerMQPlugin.h b/service/notification/src/consumer/NSConsumerMQPlugin.h
new file mode 100644 (file)
index 0000000..5c8aa93
--- /dev/null
@@ -0,0 +1,39 @@
+//******************************************************************
+//
+// Copyright 2016 Samsung Electronics All Rights Reserved.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+#ifndef _NS_CONSUMER_MQPLUGIN_H_
+#define _NS_CONSUMER_MQPLUGIN_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+#ifdef WITH_MQ
+#include "NSCommon.h"
+#include "NSStructs.h"
+
+void NSConsumerMQTaskProcessing(NSTask *);
+#endif
+
+#ifdef __cplusplus
+}
+#endif // __cplusplus
+
+#endif // _NS_CONSUMER_MQPLUGIN_H_
index d7552e4..200661f 100644 (file)
 #include "NSConsumerNetworkEventListener.h"
 #include "NSConsumerSystem.h"
 
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
 void * NSConsumerMsgHandleThreadFunc(void * handle);
 
 void * NSConsumerMsgPushThreadFunc(void * data);
@@ -328,6 +332,13 @@ void NSConsumerTaskProcessing(NSTask * task)
             NSConsumerInternalTaskProcessing(task);
             break;
         }
+#ifdef WITH_MQ
+        case TASK_MQ_REQ_SUBSCRIBE:
+        {
+            NSConsumerMQTaskProcessing(task);
+            break;
+        }
+#endif
         default:
         {
             NS_LOG(ERROR, "Unknown type of task");