Modify Consumer to support Topic functionality
authorYounghyunJoo <yh_.joo@samsung.com>
Tue, 9 Aug 2016 10:07:09 +0000 (19:07 +0900)
committerMadan Lanka <lanka.madan@samsung.com>
Wed, 10 Aug 2016 04:23:34 +0000 (04:23 +0000)
- Add 2 APIs for Topic
- Add Logic flow to support Topic

Change-Id: If44abf508a22aed6a70d4497eed214c4f610792d
Signed-off-by: YounghyunJoo <yh_.joo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/10193
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Madan Lanka <lanka.madan@samsung.com>
service/notification/include/NSConsumerInterface.h
service/notification/src/common/NSConstants.h
service/notification/src/common/NSUtil.c
service/notification/src/common/NSUtil.h
service/notification/src/consumer/NSConsumerCommunication.c
service/notification/src/consumer/NSConsumerCommunication.h
service/notification/src/consumer/NSConsumerDiscovery.c
service/notification/src/consumer/NSConsumerInterface.c
service/notification/src/consumer/NSConsumerInternalTaskController.c
service/notification/src/consumer/NSConsumerScheduler.c

index efbc2c1..79a3fcb 100644 (file)
@@ -135,6 +135,20 @@ NSProvider * NSConsumerGetProvider(const char * providerId);
  */
 NSMessage * NSConsumerGetMessage(uint64_t messageId);
 
+/**
+ * Request NSTopicList that is subscribed from provider
+ * @param[in]  provider  the provider that user wants to get
+ * @return NSResult
+ */
+NSResult NSConsumerGetInterestTopics(NSProvider * provider);
+
+/**
+ * Select Topics that is wanted to subscribe from provider
+ * @param[in]  provider  the provider that user wants to set
+ * @return NSResult
+ */
+NSResult NSConsumerSelectInterestTopics(NSProvider * provider);
+
 #ifdef __cplusplus
 }
 #endif // __cplusplus
index 1bc12ef..646c786 100644 (file)
@@ -224,6 +224,12 @@ typedef enum eTaskType
     TASK_CONSUMER_PROVIDER_DELETED = 8202,
     TASK_CONSUMER_RECV_CONFIRM = 8206,
 
+    TASK_CONSUMER_REQ_TOPIC_URI = 8299,
+    TASK_CONSUMER_REQ_TOPIC_LIST = 8300,
+    TASK_CONSUMER_RECV_TOPIC_LIST = 8031,
+    TASK_CONSUMER_GET_TOPIC_LIST = 8302,
+    TASK_CONSUMER_SELECT_TOPIC_LIST = 8303,
+
     TASK_EVENT_CONNECTED = 9000,
     TASK_EVENT_CONNECTED_TCP = 9001,
     TASK_EVENT_DISCONNECTED = 9002,
index d5885e6..e3b221a 100755 (executable)
@@ -425,6 +425,26 @@ NSMessage * NSInitializeMessage()
     return msg;
 }
 
+OCRepPayloadValue* NSPayloadFindValue(const OCRepPayload* payload, const char* name)
+{
+    if (!payload || !name)
+    {
+        return NULL;
+    }
+
+    OCRepPayloadValue* val = payload->values;
+    while(val)
+    {
+        if (0 == strcmp(val->name, name))
+        {
+            return val;
+        }
+        val = val->next;
+    }
+
+    return NULL;
+}
+
 NSTopicList * NSInitializeTopicList()
 {
     NSTopicList * topicList = (NSTopicList *)OICMalloc(sizeof(NSTopicList));
index df3de1e..b6bfd79 100755 (executable)
@@ -61,4 +61,6 @@ NSResult NSFreeMalloc(char ** obj);
 NSResult NSFreeMediaContents(NSMediaContents * obj);\r
 NSMediaContents * NSDuplicateMediaContents(NSMediaContents * copyObj);\r
 \r
+OCRepPayloadValue* NSPayloadFindValue(const OCRepPayload* payload, const char* name);\r
+\r
 #endif /* _NS_UTIL__H_ */\r
index 12f9278..1a59ad3 100644 (file)
@@ -172,6 +172,11 @@ OCStackApplicationResult NSConsumerMessageListener(
         NS_LOG(DEBUG, "Receive Subscribe confirm");
         type = TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED;
     }
+    else if (newNoti->messageId == NS_TOPIC)
+    {
+        NS_LOG(DEBUG, "Receive Topic change");
+        type = TASK_CONSUMER_REQ_TOPIC_URI;
+    }
     else
     {
         NS_LOG(DEBUG, "Receive new message");
@@ -397,9 +402,232 @@ void NSConsumerCommunicationTaskProcessing(NSTask * task)
             connections = connections->next;
         }
     }
+    else if (task->taskType == TASK_CONSUMER_REQ_TOPIC_LIST)
+    {
+        NSProvider_internal * provider = (NSProvider_internal *)task->taskData;
+
+        NSProviderConnectionInfo * connections = provider->connection;
+        NS_VERIFY_NOT_NULL_V(connections);
+
+        char * topicUri = OICStrdup(provider->topicUri);
+
+        OCConnectivityType type = CT_DEFAULT;
+        if (connections->addr->adapter == OC_ADAPTER_TCP)
+        {
+            type = CT_ADAPTER_TCP;
+            if (connections->isCloudConnection == true)
+            {
+                topicUri = NSGetCloudUri(provider->providerId, topicUri);
+            }
+        }
+
+        OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, connections->addr,
+                                topicUri, NULL, NSIntrospectTopic, (void *) provider, type);
+        NS_VERIFY_STACK_SUCCESS_V(NSOCResultToSuccess(ret));
+        NSOICFree(topicUri);
+    }
+    else if (task->taskType == TASK_CONSUMER_GET_TOPIC_LIST)
+    {
+        NSProvider_internal * provider = (NSProvider_internal *)task->taskData;
+
+        NSProviderConnectionInfo * connections = provider->connection;
+        NS_VERIFY_NOT_NULL_V(connections);
+
+        char * topicUri = OICStrdup(provider->topicUri);
+
+        OCConnectivityType type = CT_DEFAULT;
+        if (connections->addr->adapter == OC_ADAPTER_TCP)
+        {
+            type = CT_ADAPTER_TCP;
+            if (connections->isCloudConnection == true)
+            {
+                topicUri = NSGetCloudUri(provider->providerId, topicUri);
+            }
+        }
+
+        NS_LOG(DEBUG, "get topic query");
+        char * query = NULL;
+        query = NSMakeRequestUriWithConsumerId(topicUri);
+        NS_VERIFY_NOT_NULL_V(query);
+        NS_LOG_V(DEBUG, "topic query : %s", query);
+
+        OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, connections->addr,
+                                query, NULL, NSIntrospectTopic, NULL, type);
+        NS_VERIFY_STACK_SUCCESS_V(NSOCResultToSuccess(ret));
+        NSOICFree(query);
+        NSOICFree(topicUri);
+    }
+    else if (task->taskType == TASK_CONSUMER_SELECT_TOPIC_LIST)
+    {
+        NSProvider_internal * provider = (NSProvider_internal *)task->taskData;
+
+        NSProviderConnectionInfo * connections = provider->connection;
+        NS_VERIFY_NOT_NULL_V(connections);
+
+        OCRepPayload * payload = OCRepPayloadCreate();
+        NS_VERIFY_NOT_NULL_V(payload);
+        OCRepPayload ** topicPayload = (OCRepPayload **) OICMalloc(
+                                        sizeof(OCRepPayload *)*provider->topicListSize);
+        NS_VERIFY_NOT_NULL_V(topicPayload);
+
+        OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, *NSGetConsumerId());
+
+        NSTopic ** topic = provider->topicList->topics;
+
+        for (int i = 0; i < (int)provider->topicListSize; i++)
+        {
+            topicPayload[i] = OCRepPayloadCreate();
+            OCRepPayloadSetPropString(topicPayload[i], NS_ATTRIBUTE_TOPIC_NAME, topic[i]->topicName);
+            OCRepPayloadSetPropInt(topicPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, topic[i]->state);
+        }
+
+        size_t dimensions[3] = {provider->topicListSize, 0, 0};
+        OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, (const OCRepPayload **)topicPayload, dimensions);
+
+        char * topicUri = OICStrdup(provider->topicUri);
+
+        OCConnectivityType type = CT_DEFAULT;
+        if (connections->addr->adapter == OC_ADAPTER_TCP)
+        {
+            type = CT_ADAPTER_TCP;
+            if (connections->isCloudConnection == true)
+            {
+                topicUri = NSGetCloudUri(provider->providerId, topicUri);
+            }
+        }
+
+        NS_LOG(DEBUG, "get topic query");
+        char * query = NULL;
+        query = NSMakeRequestUriWithConsumerId(topicUri);
+        NS_VERIFY_NOT_NULL_V(query);
+        NS_LOG_V(DEBUG, "topic query : %s", query);
+
+        OCStackResult ret = NSInvokeRequest(NULL, OC_REST_GET, connections->addr,
+                                query, (OCPayload*)payload, NSConsumerCheckPostResult, NULL, type);
+        NS_VERIFY_STACK_SUCCESS_V(NSOCResultToSuccess(ret));
+        NSOICFree(query);
+        NSOICFree(topicUri);
+    }
     else
     {
         NS_LOG(ERROR, "Unknown type message");
     }
     NSOICFree(task);
 }
+
+void NSGetTopicPostClean(
+        char * cId, NSTopicList * tList, size_t dSize)
+{
+    NSOICFree(cId);
+    NSRemoveProviderTopicList(tList, dSize);
+}
+
+NSTopicList * NSGetTopic(OCClientResponse * clientResponse, size_t * topicListSize)
+{
+    NS_LOG(DEBUG, "create NSTopic");
+    NS_VERIFY_NOT_NULL(clientResponse->payload, NULL);
+
+    OCRepPayload * payload = (OCRepPayload *)clientResponse->payload;
+    while (payload)
+    {
+        NS_LOG_V(DEBUG, "Payload Key : %s", payload->values->name);
+        payload = payload->next;
+    }
+
+    payload = (OCRepPayload *)clientResponse->payload;
+
+    char * consumerId = NULL;
+    OCRepPayload ** topicListPayload = NULL;
+    NSTopicList * topicList = (NSTopicList *) OICMalloc(sizeof(NSTopicList));
+    NS_VERIFY_NOT_NULL(topicList, NULL);
+
+    NS_LOG(DEBUG, "get information of consumerId");
+    bool getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, & consumerId); // is NULL possible? (initial getting)
+    NS_VERIFY_NOT_NULL(getResult == true ? (void *) 1 : NULL, NULL);
+
+    OICStrcpy(topicList->consumerId, NS_DEVICE_ID_LENGTH, consumerId);
+
+    OCRepPayloadValue * payloadValue = NULL;
+    payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(payloadValue, NULL, NSOICFree(consumerId));
+
+    size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
+    size_t dimensions[3] = {dimensionSize, 0, 0};
+    *topicListSize = dimensionSize;
+
+    NS_LOG(DEBUG, "get information of topicList(OCRepPayload)");
+    getResult = OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, 
+            & topicListPayload, dimensions);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, 
+            NULL, NSOICFree(consumerId));
+
+    topicList->topics = (NSTopic **) OICMalloc(sizeof(NSTopic *)*dimensionSize);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(topicList->topics,
+            NULL, NSGetTopicPostClean(consumerId, topicList, -1));
+
+    for (int i = 0; i < (int)dimensionSize; i++)
+    {
+        char * topicName = NULL;
+        int64_t state = 0;
+
+        topicList->topics[i] = (NSTopic *) OICMalloc(sizeof(NSTopic));
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(topicList->topics[i],
+                NULL, NSGetTopicPostClean(consumerId, topicList, i));
+
+        NS_LOG(DEBUG, "get topic name");
+        getResult = OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL,
+                NULL, NSGetTopicPostClean(consumerId, topicList, i));
+
+
+        NS_LOG(DEBUG, "get topic selection");
+        getResult = OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &state);
+        NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL,
+                NULL, NSGetTopicPostClean(consumerId, topicList, i));
+
+        topicList->topics[i]->topicName = topicName;
+        topicList->topics[i]->state = state;
+    }
+
+    NSOICFree(consumerId);
+
+    return topicList;
+}
+
+OCStackApplicationResult NSIntrospectTopic(
+        void * ctx, OCDoHandle handle, OCClientResponse * clientResponse)
+{
+    (void) handle;
+
+    NS_VERIFY_NOT_NULL(clientResponse, OC_STACK_KEEP_TRANSACTION);
+    NS_VERIFY_STACK_SUCCESS(NSOCResultToSuccess(clientResponse->result), OC_STACK_KEEP_TRANSACTION);
+
+    NS_LOG_V(DEBUG, "GET response income : %s:%d",
+            clientResponse->devAddr.addr, clientResponse->devAddr.port);
+    NS_LOG_V(DEBUG, "GET response result : %d",
+            clientResponse->result);
+    NS_LOG_V(DEBUG, "GET response sequenceNum : %d",
+            clientResponse->sequenceNumber);
+    NS_LOG_V(DEBUG, "GET response resource uri : %s",
+            clientResponse->resourceUri);
+    NS_LOG_V(DEBUG, "GET response Transport Type : %d",
+                    clientResponse->devAddr.adapter);
+
+    size_t topicListSize = 0;
+    NSTopicList * newTopicList = NSGetTopic(clientResponse, &topicListSize);
+    NS_VERIFY_NOT_NULL(newTopicList, OC_STACK_KEEP_TRANSACTION);
+
+    // TODO Call the callback function registered at the start
+    NSProvider_internal * provider = (NSProvider_internal *) ctx;
+    provider->topicList = NSCopyProviderTopicList(newTopicList, topicListSize);
+    provider->topicListSize = topicListSize;
+
+    NS_LOG(DEBUG, "build NSTask");
+    NSTask * task = NSMakeTask(TASK_CONSUMER_RECV_TOPIC_LIST, (void *) provider);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(task, NS_ERROR, NSRemoveProvider(provider));
+
+    NSConsumerPushEvent(task);
+    NSRemoveProviderTopicList(newTopicList, topicListSize);
+
+    return OC_STACK_KEEP_TRANSACTION;
+}
index 31ddfb7..e46d8b1 100644 (file)
@@ -28,6 +28,7 @@ extern "C" {
 #include <stdlib.h>
 
 #include "NSCommon.h"
+#include "NSConsumerInterface.h"
 #include "NSStructs.h"
 #include "ocstack.h"
 
@@ -39,6 +40,8 @@ OCStackApplicationResult NSConsumerMessageListener(void *, OCDoHandle, OCClientR
 
 OCStackApplicationResult NSConsumerSyncInfoListener(void *, OCDoHandle, OCClientResponse *);
 
+OCStackApplicationResult NSIntrospectTopic(void *, OCDoHandle, OCClientResponse *);
+
 #ifdef __cplusplus
 }
 #endif // __cplusplus
index ea7b9da..894ab8a 100644 (file)
@@ -212,6 +212,11 @@ NSProvider_internal * NSGetProvider(OCClientResponse * clientResponse)
     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
             NSGetProviderPostClean(providerId, messageUri, syncUri, connection));
 
+    NS_LOG(DEBUG, "get topic URI");
+    getResult = OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_TOPIC, & topicUri);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(getResult == true ? (void *) 1 : NULL, NULL,
+            NSGetProviderPostClean(providerId, messageUri, syncUri, topicUri, connection));
+
     NS_LOG(DEBUG, "get provider connection information");
     NS_VERIFY_NOT_NULL(clientResponse->addr, NULL);
     connection = NSCreateProviderConnections(clientResponse->addr);
index 1402248..39f52d6 100644 (file)
@@ -153,6 +153,38 @@ NSMessage * NSConsumerGetMessage(uint64_t messageId)
     return (NSMessage *) NSConsumerFindNSMessage(msgId);
 }
 
+NSResult NSConsumerGetInterestTopics(NSProvider * provider)
+{
+    bool isStartedConsumer = NSIsStartedConsumer();
+    NS_VERIFY_NOT_NULL(isStartedConsumer == true ? (void *) 1 : NULL, NS_ERROR);
+
+    NS_VERIFY_NOT_NULL(provider, NS_ERROR);
+
+    NSTask * topicTask = NSMakeTask(TASK_CONSUMER_GET_TOPIC_LIST, (void *) provider);
+    NS_VERIFY_NOT_NULL(topicTask, NS_ERROR);
+
+    return NSConsumerPushEvent(topicTask);
+}
+
+NSResult NSConsumerSelectInterestTopics(NSProvider * provider)
+{
+    bool isStartedConsumer = NSIsStartedConsumer();
+    NS_VERIFY_NOT_NULL(isStartedConsumer == true ? (void *) 1 : NULL, NS_ERROR);
+
+    NS_VERIFY_NOT_NULL(provider, NS_ERROR);
+
+    if (!provider->topicList)
+        provider->topicList = (NSTopicList *) OICMalloc(sizeof(NSTopicList));
+    NS_VERIFY_NOT_NULL(provider->topicList, NS_ERROR);
+
+    OICStrcpy(provider->topicList->consumerId, NS_DEVICE_ID_LENGTH, provider->providerId);
+
+    NSTask * topicTask = NSMakeTask(TASK_CONSUMER_SELECT_TOPIC_LIST, (void *) provider);
+    NS_VERIFY_NOT_NULL(provider, NS_ERROR);
+
+    return NSConsumerPushEvent(topicTask);
+}
+
 NSResult NSDropNSMessage(NSMessage * obj)
 {
     NS_VERIFY_NOT_NULL(obj, NS_ERROR);
index 49129b4..029b389 100644 (file)
@@ -349,6 +349,34 @@ void NSConsumerHandleMakeSyncInfo(NSSyncInfo * sync)
     NSConsumerPushEvent(syncTask);
 }
 
+void NSConsumerHandleGetTopicUri(NSMessage * msg)
+{
+    NS_VERIFY_NOT_NULL_V(msg);
+
+    NSProvider_internal * provider = NSProviderCacheFind(msg->providerId);
+    NS_VERIFY_NOT_NULL_V(provider);
+
+    NSTask * topicTask = NSMakeTask(TASK_CONSUMER_REQ_TOPIC_LIST, (void *) provider);
+    NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(topicTask, NSRemoveProvider(provider));
+
+    NSConsumerPushEvent(topicTask);
+}
+
+void NSConsumerHandleRecvTopicList(NSProvider_internal * provider)
+{
+    NS_VERIFY_NOT_NULL_V(provider);
+
+    NSResult ret = NSProviderCacheUpdate(provider);
+    NS_VERIFY_NOT_NULL_V(ret == NS_OK ? (void *) 1 : NULL);
+
+    // call the callback function when consumer is an accepter
+    if (provider->connection->next == NULL)
+    {
+        NS_LOG(DEBUG, "call back to user");
+        NSProviderChanged((NSProvider *) provider, (NSResponse) NS_TOPIC);
+    }
+}
+
 void NSConsumerInternalTaskProcessing(NSTask * task)
 {
     NS_VERIFY_NOT_NULL_V(task);
@@ -391,6 +419,20 @@ void NSConsumerInternalTaskProcessing(NSTask * task)
             NSOICFree(task->taskData);
             break;
         }
+        case TASK_CONSUMER_REQ_TOPIC_URI:
+        {
+            NS_LOG(DEBUG, "Request Topic Uri");
+            NSConsumerHandleGetTopicUri((NSMessage *)task->taskData);
+            NSRemoveMessage((NSMessage *)task->taskData);
+            break;
+        }
+        case TASK_CONSUMER_RECV_TOPIC_LIST:
+        {
+            NS_LOG(DEBUG, "Receive Topic List");
+            NSConsumerHandleRecvTopicList((NSProvider_internal *)task->taskData);
+            NSRemoveProvider((NSProvider_internal *)task->taskData);
+            break;
+        }
         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
         {
             NS_LOG(DEBUG, "Make Subscribe cancel from provider.");
index 3205ba9..4716ddd 100644 (file)
@@ -220,6 +220,9 @@ void NSConsumerTaskProcessing(NSTask * task)
         }
         case TASK_CONSUMER_REQ_SUBSCRIBE:
         case TASK_SEND_SYNCINFO:
+        case TASK_CONSUMER_REQ_TOPIC_LIST:
+        case TASK_CONSUMER_GET_TOPIC_LIST:
+        case TASK_CONSUMER_SELECT_TOPIC_LIST:
         {
             NSConsumerCommunicationTaskProcessing(task);
             break;
@@ -239,6 +242,8 @@ void NSConsumerTaskProcessing(NSTask * task)
         case TASK_CONSUMER_PROVIDER_DISCOVERED:
         case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
         case TASK_MAKE_SYNCINFO:
+        case TASK_CONSUMER_REQ_TOPIC_URI:
+        case TASK_CONSUMER_RECV_TOPIC_LIST:
         {
             NSConsumerInternalTaskProcessing(task);
             break;