-//******************************************************************\r
-//\r
-// Copyright 2016 Samsung Electronics All Rights Reserved.\r
-//\r
-//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=\r
-//\r
-// Licensed under the Apache License, Version 2.0 (the "License");\r
-// you may not use this file except in compliance with the License.\r
-// You may obtain a copy of the License at\r
-//\r
-// http://www.apache.org/licenses/LICENSE-2.0\r
-//\r
-// Unless required by applicable law or agreed to in writing, software\r
-// distributed under the License is distributed on an "AS IS" BASIS,\r
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-// See the License for the specific language governing permissions and\r
-// limitations under the License.\r
-//\r
-//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=\r
-\r
-#include "NSProviderTopic.h"\r
-\r
-NSResult NSSendTopicUpdation();\r
-\r
-NSResult NSInitTopicList()\r
-{\r
- NS_LOG(DEBUG, "NSInitTopicList - IN");\r
- consumerTopicList = NSStorageCreate();\r
- consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;\r
-\r
- registeredTopicList = NSStorageCreate();\r
- registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;\r
-\r
- NS_LOG(DEBUG, "NSInitTopicList - OUT");\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSAddTopics(const char * topicName)\r
-{\r
- NS_LOG(DEBUG, "NSWriteTopicsToStorage()");\r
-\r
- NSCacheTopicData * data = (NSCacheTopicData *)OICMalloc(sizeof(NSCacheTopicData));\r
- data->topicName = topicName;\r
- data->state = NS_TOPIC_UNSUBSCRIBED;\r
-\r
- NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
- element->data = (void *) data;\r
- element->next = NULL;\r
-\r
- if(NSStorageWrite(registeredTopicList, element) != NS_OK)\r
- {\r
- NS_LOG(DEBUG, "fail to write cache");\r
- }\r
- NSSendTopicUpdation();\r
-\r
- NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSDeleteTopics(const char * topicName)\r
-{\r
- NS_LOG(DEBUG, "NSDeleteTopics()");\r
-\r
- if(!topicName)\r
- {\r
- NS_LOG(ERROR, "topicName is NULL");\r
- return NS_ERROR;\r
- }\r
-\r
- NSStorageDelete(registeredTopicList, topicName);\r
- while(NSStorageDelete(consumerTopicList, topicName) != NS_FAIL);\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSSendTopicUpdation()\r
-{\r
- NS_LOG(DEBUG, "NSSendTopicUpdation - IN");\r
-\r
- OCRepPayload* payload = OCRepPayloadCreate();\r
-\r
- if (!payload)\r
- {\r
- NS_LOG(ERROR, "fail to create playload");\r
- return NS_ERROR;\r
- }\r
-\r
- OCResourceHandle rHandle = NULL;\r
- if (NSPutMessageResource(NULL, &rHandle) != NS_OK)\r
- {\r
- NS_LOG(ERROR, "Fail to put message resource");\r
- return NS_ERROR;\r
- }\r
-\r
- OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);\r
- OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);\r
- OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);\r
-\r
- OCObservationId obArray[255] = { 0, };\r
- int obCount = 0;\r
-\r
- NSCacheElement * it = consumerSubList->head;\r
-\r
- while (it)\r
- {\r
- NSCacheSubData * subData = (NSCacheSubData *) it->data;\r
-\r
- if (subData->isWhite)\r
- {\r
- if(subData->messageObId != 0)\r
- {\r
- obArray[obCount++] = subData->messageObId;\r
- }\r
-\r
-#ifdef RD_CLIENT\r
- if(subData->remote_messageObId != 0)\r
- {\r
- obArray[obCount++] = subData->remote_messageObId;\r
- }\r
-#endif\r
-\r
- }\r
- it = it->next;\r
- }\r
-\r
- if(!obCount)\r
- {\r
- NS_LOG(ERROR, "observer count is zero");\r
- return NS_ERROR;\r
- }\r
-\r
- if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS)\r
- != OC_STACK_OK)\r
- {\r
- NS_LOG(ERROR, "fail to send topic updation");\r
- OCRepPayloadDestroy(payload);\r
- return NS_ERROR;\r
-\r
- }\r
- OCRepPayloadDestroy(payload);\r
-\r
- NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSSendTopicUpdationToConsumer(char *consumerId)\r
-{\r
- NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");\r
-\r
- OCRepPayload* payload = OCRepPayloadCreate();\r
-\r
- if (!payload)\r
- {\r
- NS_LOG(ERROR, "fail to create playload");\r
- return NS_ERROR;\r
- }\r
-\r
- OCResourceHandle rHandle = NULL;\r
- if (NSPutMessageResource(NULL, &rHandle) != NS_OK)\r
- {\r
- NS_LOG(ERROR, "Fail to put message resource");\r
- return NS_ERROR;\r
- }\r
-\r
- OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);\r
- OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);\r
- OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);\r
-\r
- NSCacheElement * element = NSStorageRead(consumerSubList, consumerId);\r
-\r
- if(element == NULL)\r
- {\r
- NS_LOG(ERROR, "element is NULL");\r
- return NS_ERROR;\r
- }\r
-\r
- NSCacheSubData * subData = (NSCacheSubData*) element->data;\r
-\r
- if (OCNotifyListOfObservers(rHandle, (OCObservationId*)&subData->messageObId, 1, payload, OC_HIGH_QOS)\r
- != OC_STACK_OK)\r
- {\r
- NS_LOG(ERROR, "fail to send topic updation");\r
- OCRepPayloadDestroy(payload);\r
- return NS_ERROR;\r
-\r
- }\r
-\r
- OCRepPayloadDestroy(payload);\r
-\r
- NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)\r
-{\r
- NS_LOG(DEBUG, "NSSendTopicList - IN");\r
-\r
- char * id = NSGetValueFromQuery(OICStrdup(entityHandlerRequest->query), NS_QUERY_CONSUMER_ID);\r
- NSTopicLL * topics = NULL;\r
- NSCacheElement * currList = NULL;\r
-\r
- if(!id)\r
- {\r
- NS_LOG(DEBUG, "Send registered topic list");\r
- topics = NSProviderGetTopicsCacheData(registeredTopicList);\r
- currList = registeredTopicList->head;\r
- }\r
- else\r
- {\r
- NS_LOG(DEBUG, "Send subscribed topic list to consumer");\r
- topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);\r
- currList = consumerTopicList->head;\r
- }\r
-\r
- if(!currList)\r
- {\r
- NS_LOG(DEBUG, "currList is NULL");\r
- return NS_ERROR;\r
- }\r
-\r
- // make response for the Get Request\r
- OCEntityHandlerResponse response;\r
- response.numSendVendorSpecificHeaderOptions = 0;\r
- memset(response.sendVendorSpecificHeaderOptions, 0,\r
- sizeof response.sendVendorSpecificHeaderOptions);\r
- memset(response.resourceUri, 0, sizeof response.resourceUri);\r
-\r
- OCRepPayload* payload = OCRepPayloadCreate();\r
- if (!payload)\r
- {\r
- NS_LOG(ERROR, "payload is NULL");\r
- return NS_ERROR;\r
- }\r
-\r
- // set topics to the array of resource property\r
-\r
- NSCacheElement * iter = currList;\r
- size_t dimensionSize = (size_t)NSProviderGetListSize(iter);\r
-\r
- NS_LOG_V(DEBUG, "dimensionSize = %d", dimensionSize);\r
-\r
- if(!dimensionSize)\r
- {\r
- return NS_ERROR;\r
- }\r
-\r
- OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(\r
- sizeof(OCRepPayload *) * dimensionSize);\r
-\r
- size_t dimensions[3] = {dimensionSize, 0, 0};\r
- for (int i = 0; i < (int)dimensionSize; i++)\r
- {\r
- NSTopicLL * topic = (NSTopicLL *) iter->data;\r
-\r
- NS_LOG_V(DEBUG, "topicName = %s", topic->topicName);\r
- NS_LOG_V(DEBUG, "topicState = %d",(int) topic->state);\r
-\r
- payloadTopicArray[i] = OCRepPayloadCreate();\r
- OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME, topic->topicName);\r
- OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,\r
- (int)topic->state);\r
-\r
- iter = iter->next;\r
- }\r
-\r
- OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);\r
- if(id)\r
- {\r
- OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);\r
- }\r
- OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST,\r
- (const OCRepPayload**)(payloadTopicArray), dimensions);\r
-\r
- response.requestHandle = entityHandlerRequest->requestHandle;\r
- response.resourceHandle = entityHandlerRequest->resource;\r
- response.persistentBufferFlag = 0;\r
- response.ehResult = OC_EH_OK;\r
- response.payload = (OCPayload *) payload;\r
-\r
- if (OCDoResponse(&response) != OC_STACK_OK)\r
- {\r
- NS_LOG(ERROR, "Fail to response topic list");\r
- return NS_ERROR;\r
- }\r
- OCRepPayloadDestroy(payload);\r
-\r
- NS_LOG(DEBUG, "NSSendTopicList - OUT");\r
- return NS_OK;\r
-}\r
-\r
-NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)\r
-{\r
- NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");\r
-\r
- char * consumerId = NULL;\r
- OCRepPayload * payload = entityHandlerRequest->payload;\r
- OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);\r
-\r
- if(!consumerId)\r
- {\r
- NS_LOG(DEBUG, "Invalid consumer ID");\r
- return NS_ERROR;\r
- }\r
-\r
- OCRepPayload ** topicListPayload = NULL;\r
- OCRepPayloadValue * payloadValue = NULL;\r
- payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);\r
- size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);\r
- size_t dimensions[3] = {dimensionSize, 0, 0};\r
- OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, & topicListPayload, dimensions);\r
-\r
- for(int i = 0; i <(int)dimensionSize; i++)\r
- {\r
- char * topicName = NULL;\r
- int topicState = 0;\r
-\r
- OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);\r
- OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState);\r
- NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, topicState);\r
-\r
- if(NS_TOPIC_SUBSCRIBED == (NSTopicState)topicState)\r
- {\r
- NSCacheTopicSubData * topicSubData =\r
- (NSCacheTopicSubData *) OICMalloc(sizeof(NSCacheTopicSubData));\r
-\r
- OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);\r
- topicSubData->topicName = OICStrdup(topicName);\r
-\r
- NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
- newObj->data = (NSCacheData *) topicSubData;\r
- newObj->next = NULL;\r
- NSStorageWrite(consumerTopicList, newObj);\r
- }\r
- }\r
-\r
- NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");\r
- return NS_OK;\r
-}\r
-\r
-void * NSTopicSchedule(void * ptr)\r
-{\r
- if (ptr == NULL)\r
- {\r
- NS_LOG(DEBUG, "Create NSTopicSchedule");\r
- }\r
-\r
- while (NSIsRunning[TOPIC_SCHEDULER])\r
- {\r
- sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);\r
- pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);\r
-\r
- if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)\r
- {\r
- NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];\r
- NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;\r
-\r
- switch (node->taskType)\r
- {\r
- case TASK_SEND_TOPICS:\r
- NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");\r
- NSSendTopicList((OCEntityHandlerRequest*) node->taskData);\r
- NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);\r
- break;\r
- case TASK_SUBSCRIBE_TOPIC:\r
- NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");\r
- NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
- newObj->data = node->taskData;\r
- newObj->next = NULL;\r
- NSStorageWrite(consumerTopicList, newObj);\r
- break;\r
- case TASK_UNSUBSCRIBE_TOPIC:\r
- NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");\r
- NSProviderDeleteConsumerTopic(consumerTopicList,\r
- (NSCacheTopicSubData *) node->taskData);\r
- NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC AFter: ");\r
- break;\r
- case TASK_ADD_TOPIC:\r
- {\r
- NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");\r
- NSAddTopics((const char *) node->taskData);\r
- }\r
- break;\r
- case TASK_DELETE_TOPIC:\r
- {\r
- NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");\r
- NSDeleteTopics((const char *) node->taskData);\r
- }\r
- break;\r
- case TASK_POST_TOPIC:\r
- {\r
- NS_LOG(DEBUG, "TASK_POST_TOPIC : ");\r
- NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);\r
- NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);\r
- }\r
- break;\r
- default:\r
- break;\r
- }\r
-\r
- OICFree(node);\r
- }\r
-\r
- pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);\r
- }\r
-\r
- NS_LOG(DEBUG, "Destroy NSTopicSchedule");\r
- return NULL;\r
-}\r
+//******************************************************************
+//
+// Copyright 2016 Samsung Electronics All Rights Reserved.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
+
+#include "NSProviderTopic.h"
+#include "oic_string.h"
+#include "oic_malloc.h"
+#include <pthread.h>
+
+NSCacheList * consumerTopicList;
+NSCacheList * registeredTopicList;
+
+NSResult NSSendTopicUpdation();
+
+NSResult NSInitTopicList()
+{
+ NS_LOG(DEBUG, "NSInitTopicList - IN");
+
+ consumerTopicList = NSProviderStorageCreate();
+ NS_VERIFY_NOT_NULL(consumerTopicList, NS_FAIL);
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
+
+ registeredTopicList = NSProviderStorageCreate();
+ NS_VERIFY_NOT_NULL(registeredTopicList, NS_FAIL);
+ registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;
+
+ NS_LOG(DEBUG, "NSInitTopicList - OUT");
+ return NS_OK;
+}
+
+size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)
+{
+ if (!firstElement)
+ {
+ return 0;
+ }
+
+ int cnt = 0;
+
+ NSTopicLL * iter = firstElement;
+
+ while (iter)
+ {
+ cnt++;
+ iter = iter->next;
+ }
+
+ return cnt;
+}
+
+NSResult NSRegisterTopic(const char * topicName)
+{
+ NS_LOG(DEBUG, "NSWriteTopicsToStorage()");
+
+ NSCacheTopicData * data = (NSCacheTopicData *) OICMalloc(sizeof(NSCacheTopicData));
+ NS_VERIFY_NOT_NULL(data, NS_FAIL);
+ data->topicName = (char *) topicName;
+ data->state = NS_TOPIC_UNSUBSCRIBED;
+
+ NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
+ if (!element)
+ {
+ NSOICFree(data->topicName);
+ NSOICFree(data);
+ return NS_FAIL;
+ }
+
+ element->data = (void *) data;
+ element->next = NULL;
+
+ if (NSProviderStorageWrite(registeredTopicList, element) != NS_OK)
+ {
+ NS_LOG(DEBUG, "fail to write cache");
+ return NS_FAIL;
+ }
+
+ NSSendTopicUpdation();
+ NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");
+ return NS_OK;
+}
+
+NSResult NSUnregisterTopic(const char * topicName)
+{
+ NS_LOG(DEBUG, "NSDeleteTopics()");
+ NSResult result = NS_OK;
+
+ if (!topicName)
+ {
+ NS_LOG(ERROR, "topicName is NULL");
+ return NS_ERROR;
+ }
+
+ result = NSProviderStorageDelete(registeredTopicList, topicName);
+
+ while (NSProviderStorageDelete(consumerTopicList, topicName) != NS_FAIL)
+ {
+ }
+
+ if (result == NS_OK)
+ {
+ NSSendTopicUpdation();
+ }
+
+ return result;
+}
+
+NSResult NSSendTopicUpdation()
+{
+ NS_LOG(DEBUG, "NSSendTopicUpdation - IN");
+
+ OCRepPayload* payload = OCRepPayloadCreate();
+
+ if (!payload)
+ {
+ NS_LOG(ERROR, "fail to create playload");
+ return NS_ERROR;
+ }
+
+ OCResourceHandle rHandle = NULL;
+ if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
+ {
+ NS_LOG(ERROR, "Fail to put message resource");
+ return NS_ERROR;
+ }
+
+ OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
+ OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
+
+ OCObservationId obArray[3839] =
+ { 0, };
+ int obCount = 0;
+
+ NSCacheElement * it = consumerSubList->head;
+
+ while (it)
+ {
+ NSCacheSubData * subData = (NSCacheSubData *) it->data;
+
+ if (subData->isWhite)
+ {
+ if (subData->messageObId != 0)
+ {
+ obArray[obCount++] = subData->messageObId;
+ }
+
+#if (defined WITH_CLOUD)
+ if (subData->remote_messageObId != 0)
+ {
+ obArray[obCount++] = subData->remote_messageObId;
+ }
+#endif
+ }
+
+ it = it->next;
+ }
+
+ if (!obCount)
+ {
+ NS_LOG(ERROR, "observer count is zero");
+ return NS_ERROR;
+ }
+
+ if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS) != OC_STACK_OK)
+ {
+ NS_LOG(ERROR, "fail to send topic updation");
+ OCRepPayloadDestroy(payload);
+ return NS_ERROR;
+
+ }
+ OCRepPayloadDestroy(payload);
+
+ NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");
+ return NS_OK;
+}
+
+NSResult NSSendTopicUpdationToConsumer(char *consumerId)
+{
+ NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");
+
+ OCRepPayload* payload = OCRepPayloadCreate();
+
+ if (!payload)
+ {
+ NS_LOG(ERROR, "fail to create playload");
+ return NS_ERROR;
+ }
+
+ OCResourceHandle rHandle = NULL;
+ if (NSPutMessageResource(NULL, &rHandle) != NS_OK)
+ {
+ NS_LOG(ERROR, "Fail to put message resource");
+ return NS_ERROR;
+ }
+
+ OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);
+ OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
+
+ NSCacheElement * element = NSProviderStorageRead(consumerSubList, consumerId);
+
+ if (element == NULL)
+ {
+ NS_LOG(ERROR, "element is NULL");
+ return NS_ERROR;
+ }
+
+ NSCacheSubData * subData = (NSCacheSubData*) element->data;
+
+ if (OCNotifyListOfObservers(rHandle, (OCObservationId*) &subData->messageObId, 1, payload,
+ OC_HIGH_QOS) != OC_STACK_OK)
+ {
+ NS_LOG(ERROR, "fail to send topic updation");
+ OCRepPayloadDestroy(payload);
+ return NS_ERROR;
+ }
+
+ OCRepPayloadDestroy(payload);
+
+ NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");
+ return NS_OK;
+}
+
+NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)
+{
+ NS_LOG(DEBUG, "NSSendTopicList - IN");
+
+ char * copyReq = OICStrdup(entityHandlerRequest->query);
+ char * id = NSGetValueFromQuery(copyReq, NS_QUERY_CONSUMER_ID);
+ NSTopicLL * topics = NULL;
+
+ if (!id)
+ {
+ NS_LOG(DEBUG, "Send registered topic list");
+ topics = NSProviderGetTopicsCacheData(registeredTopicList);
+ }
+ else
+ {
+ NS_LOG(DEBUG, "Send subscribed topic list to consumer");
+ topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);
+ if (!topics)
+ {
+ topics = NSProviderGetTopicsCacheData(registeredTopicList);
+ }
+ }
+
+ // make response for the Get Request
+ OCEntityHandlerResponse response;
+ response.numSendVendorSpecificHeaderOptions = 0;
+ memset(response.sendVendorSpecificHeaderOptions, 0,
+ sizeof response.sendVendorSpecificHeaderOptions);
+ memset(response.resourceUri, 0, sizeof response.resourceUri);
+
+ OCRepPayload* payload = OCRepPayloadCreate();
+ if (!payload)
+ {
+ NS_LOG(ERROR, "payload is NULL");
+ NSOICFree(copyReq);
+ return NS_ERROR;
+ }
+
+ OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);
+ if (id)
+ {
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);
+ }
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);
+ NSOICFree(copyReq);
+
+ if (topics)
+ {
+ NS_LOG(DEBUG, "topicList is NULL");
+ size_t dimensionSize = (size_t) NSProviderGetTopicListSize(topics);
+ NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);
+
+ if (!dimensionSize)
+ {
+ return NS_ERROR;
+ }
+
+ OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(
+ sizeof(OCRepPayload *) * dimensionSize);
+ NS_VERIFY_NOT_NULL(payloadTopicArray, NS_ERROR);
+
+ size_t dimensions[3] = { dimensionSize, 0, 0 };
+
+ for (int i = 0; i < (int) dimensionSize; i++)
+ {
+ NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);
+ NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);
+
+ payloadTopicArray[i] = OCRepPayloadCreate();
+ if (!payloadTopicArray[i])
+ {
+ NS_LOG_V(ERROR, "payloadTopicArray[%d] is NULL", i);
+ NSOICFree(payloadTopicArray);
+ return NS_ERROR;
+ }
+
+ OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,
+ topics->topicName);
+ OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,
+ (int) topics->state);
+
+ NSTopicLL * next = topics->next;
+ NSOICFree(topics->topicName);
+ NSOICFree(topics);
+ topics = next;
+ }
+
+ OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
+ payloadTopicArray, dimensions);
+ }
+ else
+ {
+ size_t dimensions[3] = { 0, 0, 0 };
+
+ OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,
+ (OCRepPayload **) NULL, dimensions);
+ }
+
+ copyReq = OICStrdup(entityHandlerRequest->query);
+ char * reqInterface = NSGetValueFromQuery(copyReq, NS_QUERY_INTERFACE);
+
+ if (reqInterface && strcmp(reqInterface, NS_INTERFACE_BASELINE) == 0)
+ {
+ OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_BASELINE);
+ OCResourcePayloadAddStringLL(&payload->interfaces, NS_INTERFACE_READ);
+ OCResourcePayloadAddStringLL(&payload->types, NS_ROOT_TYPE);
+ }
+
+ NSOICFree(copyReq);
+ response.requestHandle = entityHandlerRequest->requestHandle;
+ response.resourceHandle = entityHandlerRequest->resource;
+ response.persistentBufferFlag = 0;
+ response.ehResult = OC_EH_OK;
+ response.payload = (OCPayload *) payload;
+
+ if (OCDoResponse(&response) != OC_STACK_OK)
+ {
+ NS_LOG(ERROR, "Fail to response topic list");
+ OCRepPayloadDestroy(payload);
+ return NS_ERROR;
+ }
+
+ OCRepPayloadDestroy(payload);
+ NS_LOG(DEBUG, "NSSendTopicList - OUT");
+ return NS_OK;
+}
+
+NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)
+{
+ NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");
+
+ char * consumerId = NULL;
+ OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;
+ OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);
+
+ if (!consumerId)
+ {
+ NS_LOG(DEBUG, "Invalid consumer ID");
+ return NS_FAIL;
+ }
+
+ NS_LOG_V(INFO_PRIVATE, "TOPIC consumer ID = %s", consumerId);
+
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;
+
+ while (NSProviderStorageDelete(consumerTopicList, consumerId) != NS_FAIL)
+ {
+ }
+
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;
+ OCRepPayload ** topicListPayload = NULL;
+ OCRepPayloadValue * payloadValue = NULL;
+ payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);
+ size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);
+ size_t dimensions[3] = { dimensionSize, 0, 0 };
+ OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, &topicListPayload, dimensions);
+
+ for (int i = 0; i < (int) dimensionSize; i++)
+ {
+ char * topicName = NULL;
+ int64_t topicState = 0;
+
+ OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);
+ if (OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState))
+ {
+ NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);
+ }
+
+ if (NS_TOPIC_SUBSCRIBED == (NSTopicState) topicState)
+ {
+ NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *) OICMalloc(
+ sizeof(NSCacheTopicSubData));
+ NS_VERIFY_NOT_NULL_EXIT(topicSubData);
+
+ OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);
+ topicSubData->topicName = topicName;
+
+ NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
+
+ if (!newObj)
+ {
+ NSOICFree(topicSubData->topicName);
+ NSOICFree(topicSubData);
+ NSOICFree(consumerId);
+
+ // Free topic list
+ for (size_t k = 0; k < dimensionSize; k++)
+ {
+ OCRepPayloadDestroy(topicListPayload[k]);
+ }
+ OICFree(topicListPayload);
+
+ return NS_FAIL;
+ }
+
+ newObj->data = (NSCacheData *) topicSubData;
+ newObj->next = NULL;
+
+ if (NS_OK != NSProviderStorageWrite(consumerTopicList, newObj))
+ {
+ NS_LOG(ERROR, "Fail to write cache");
+ }
+ }
+ }
+ NSSendTopicUpdationToConsumer(consumerId);
+
+ // Free topic list
+ for (size_t k = 0; k < dimensionSize; k++)
+ {
+ OCRepPayloadDestroy(topicListPayload[k]);
+ }
+ OICFree(topicListPayload);
+
+ NSOICFree(consumerId);
+ NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");
+ return NS_OK;
+
+exit:
+ OICFree(topicListPayload);
+ return NS_FAIL;
+}
+
+void * NSTopicSchedule(void * ptr)
+{
+ if (ptr == NULL)
+ {
+ NS_LOG(DEBUG, "Create NSTopicSchedule");
+ }
+
+ while (NSIsRunning[TOPIC_SCHEDULER])
+ {
+ sem_wait(&NSSemaphore[TOPIC_SCHEDULER]);
+ pthread_mutex_lock(&NSMutex[TOPIC_SCHEDULER]);
+
+ if (NSHeadMsg[TOPIC_SCHEDULER] != NULL)
+ {
+ NSTask *node = NSHeadMsg[TOPIC_SCHEDULER];
+ NSHeadMsg[TOPIC_SCHEDULER] = node->nextTask;
+
+ switch (node->taskType)
+ {
+ case TASK_SEND_TOPICS:
+ NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");
+ NSSendTopicList((OCEntityHandlerRequest*) node->taskData);
+ NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
+ break;
+ case TASK_SUBSCRIBE_TOPIC:
+ {
+ NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");
+ NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
+ pthread_mutex_lock(topicSyncResult->mutex);
+ NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));
+ NSCacheTopicSubData * subData =
+ (NSCacheTopicSubData *) topicSyncResult->topicData;
+ if (!newObj)
+ {
+ NSOICFree(subData->topicName);
+ NSOICFree(subData);
+ pthread_cond_signal(topicSyncResult->condition);
+ pthread_mutex_unlock(topicSyncResult->mutex);
+ }
+ else
+ {
+ if (NSProviderStorageRead(registeredTopicList, subData->topicName))
+ {
+ newObj->data = topicSyncResult->topicData;
+ newObj->next = NULL;
+
+ if (NSProviderStorageWrite(consumerTopicList, newObj) == NS_OK)
+ {
+ if(subData)
+ {
+ NSSendTopicUpdationToConsumer(subData->id);
+ }
+ topicSyncResult->result = NS_OK;
+ }
+ }
+ else
+ {
+ NSOICFree(subData->topicName);
+ NSOICFree(subData);
+ NSOICFree(newObj);
+ }
+ }
+ pthread_cond_signal(topicSyncResult->condition);
+ pthread_mutex_unlock(topicSyncResult->mutex);
+ }
+ break;
+ case TASK_UNSUBSCRIBE_TOPIC:
+ {
+ NS_LOG(DEBUG, "CASE TASK_UNSUBSCRIBE_TOPIC : ");
+ NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
+ pthread_mutex_lock(topicSyncResult->mutex);
+ NSCacheTopicSubData * topicSubData =
+ (NSCacheTopicSubData *) topicSyncResult->topicData;
+
+ if (NSProviderDeleteConsumerTopic(consumerTopicList, topicSubData) == NS_OK)
+ {
+ NSSendTopicUpdationToConsumer(topicSubData->id);
+ topicSyncResult->result = NS_OK;
+ }
+
+ NSOICFree(topicSubData->topicName);
+ NSOICFree(topicSubData);
+ pthread_cond_signal(topicSyncResult->condition);
+ pthread_mutex_unlock(topicSyncResult->mutex);
+
+ }
+ break;
+ case TASK_REGISTER_TOPIC:
+ {
+ NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");
+ NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
+
+ pthread_mutex_lock(topicSyncResult->mutex);
+ topicSyncResult->result = NSRegisterTopic(
+ (const char *) topicSyncResult->topicData);
+ pthread_cond_signal(topicSyncResult->condition);
+ pthread_mutex_unlock(topicSyncResult->mutex);
+ }
+ break;
+ case TASK_UNREGISTER_TOPIC:
+ {
+ NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");
+ NSTopicSyncResult * topicSyncResult = (NSTopicSyncResult *) node->taskData;
+ pthread_mutex_lock(topicSyncResult->mutex);
+ topicSyncResult->result = NSUnregisterTopic(
+ (const char *) topicSyncResult->topicData);
+ pthread_cond_signal(topicSyncResult->condition);
+ pthread_mutex_unlock(topicSyncResult->mutex);
+ }
+ break;
+ case TASK_POST_TOPIC:
+ {
+ NS_LOG(DEBUG, "TASK_POST_TOPIC : ");
+ NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);
+ NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);
+ }
+ break;
+ case TASK_GET_TOPICS:
+ {
+ NS_LOG(DEBUG, "TASK_GET_TOPICS : ");
+ NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
+ pthread_mutex_lock(topicSync->mutex);
+ NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);
+ topicSync->topics = topics;
+ pthread_cond_signal(topicSync->condition);
+ pthread_mutex_unlock(topicSync->mutex);
+ }
+ break;
+ case TAST_GET_CONSUMER_TOPICS:
+ {
+ NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");
+ NSTopicSync * topicSync = (NSTopicSync *) node->taskData;
+ pthread_mutex_lock(topicSync->mutex);
+ NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,
+ consumerTopicList, topicSync->consumerId);
+ topicSync->topics = topics;
+ pthread_cond_signal(topicSync->condition);
+ pthread_mutex_unlock(topicSync->mutex);
+ }
+ break;
+ default:
+ break;
+ }
+
+ NSOICFree(node);
+ }
+
+ pthread_mutex_unlock(&NSMutex[TOPIC_SCHEDULER]);
+ }
+
+ NS_LOG(DEBUG, "Destroy NSTopicSchedule");
+ return NULL;
+}