//-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=\r
\r
#include "NSProviderTopic.h"\r
+#include "oic_string.h"\r
+#include "oic_malloc.h"\r
+#include <pthread.h>\r
\r
-NSTopicList * NSGetTopics(char *consumerId)\r
+NSResult NSSendTopicUpdation();\r
+\r
+NSResult NSInitTopicList()\r
+{\r
+ NS_LOG(DEBUG, "NSInitTopicList - IN");\r
+ consumerTopicList = NSStorageCreate();\r
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;\r
+\r
+ registeredTopicList = NSStorageCreate();\r
+ registeredTopicList->cacheType = NS_PROVIDER_CACHE_REGISTER_TOPIC;\r
+\r
+ NS_LOG(DEBUG, "NSInitTopicList - OUT");\r
+ return NS_OK;\r
+}\r
+\r
+size_t NSProviderGetTopicListSize(NSTopicLL * firstElement)\r
{\r
- NS_LOG(DEBUG, "NSGetTopics()");\r
+ if(!firstElement)\r
+ {\r
+ return 0;\r
+ }\r
+\r
+ int cnt = 0;\r
+\r
+ NSTopicLL * iter = firstElement;\r
+\r
+ while(iter)\r
+ {\r
+ cnt++;\r
+ iter = iter->next;\r
+ }\r
+\r
+ return cnt;\r
+}\r
+\r
+NSResult NSAddTopics(const char * topicName)\r
+{\r
+ NS_LOG(DEBUG, "NSWriteTopicsToStorage()");\r
+\r
+ NSCacheTopicData * data = (NSCacheTopicData *)OICMalloc(sizeof(NSCacheTopicData));\r
+ data->topicName = (char *)topicName;\r
+ data->state = NS_TOPIC_UNSUBSCRIBED;\r
+\r
+ NSCacheElement * element = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
+ element->data = (void *) data;\r
+ element->next = NULL;\r
+\r
+ if(NSStorageWrite(registeredTopicList, element) != NS_OK)\r
+ {\r
+ NS_LOG(DEBUG, "fail to write cache");\r
+ }\r
+ NSSendTopicUpdation();\r
+\r
+ NS_LOG(DEBUG, "NSWriteTopicsToStorage() NS_OK");\r
+ return NS_OK;\r
+}\r
+\r
+NSResult NSDeleteTopics(const char * topicName)\r
+{\r
+ NS_LOG(DEBUG, "NSDeleteTopics()");\r
+\r
+ if(!topicName)\r
+ {\r
+ NS_LOG(ERROR, "topicName is NULL");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ NSStorageDelete(registeredTopicList, topicName);\r
+ while(NSStorageDelete(consumerTopicList, topicName) != NS_FAIL);\r
+ return NS_OK;\r
+}\r
+\r
+NSResult NSSendTopicUpdation()\r
+{\r
+ NS_LOG(DEBUG, "NSSendTopicUpdation - IN");\r
+\r
+ OCRepPayload* payload = OCRepPayloadCreate();\r
+\r
+ if (!payload)\r
+ {\r
+ NS_LOG(ERROR, "fail to create playload");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCResourceHandle rHandle = NULL;\r
+ if (NSPutMessageResource(NULL, &rHandle) != NS_OK)\r
+ {\r
+ NS_LOG(ERROR, "Fail to put message resource");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);\r
+ OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);\r
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);\r
+\r
+ OCObservationId obArray[255] = { 0, };\r
+ int obCount = 0;\r
+\r
+ NSCacheElement * it = consumerSubList->head;\r
+\r
+ while (it)\r
+ {\r
+ NSCacheSubData * subData = (NSCacheSubData *) it->data;\r
+\r
+ if (subData->isWhite)\r
+ {\r
+ if(subData->messageObId != 0)\r
+ {\r
+ obArray[obCount++] = subData->messageObId;\r
+ }\r
+\r
+#if(defined WITH_CLOUD && defined RD_CLIENT)\r
+ if(subData->remote_messageObId != 0)\r
+ {\r
+ obArray[obCount++] = subData->remote_messageObId;\r
+ }\r
+#endif\r
+ }\r
+ it = it->next;\r
+ }\r
+\r
+ if(!obCount)\r
+ {\r
+ NS_LOG(ERROR, "observer count is zero");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ if (OCNotifyListOfObservers(rHandle, obArray, obCount, payload, OC_HIGH_QOS)\r
+ != OC_STACK_OK)\r
+ {\r
+ NS_LOG(ERROR, "fail to send topic updation");\r
+ OCRepPayloadDestroy(payload);\r
+ return NS_ERROR;\r
+\r
+ }\r
+ OCRepPayloadDestroy(payload);\r
+\r
+ NS_LOG(DEBUG, "NSSendTopicUpdation - OUT");\r
+ return NS_OK;\r
+}\r
+\r
+NSResult NSSendTopicUpdationToConsumer(char *consumerId)\r
+{\r
+ NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - IN");\r
+\r
+ OCRepPayload* payload = OCRepPayloadCreate();\r
+\r
+ if (!payload)\r
+ {\r
+ NS_LOG(ERROR, "fail to create playload");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCResourceHandle rHandle = NULL;\r
+ if (NSPutMessageResource(NULL, &rHandle) != NS_OK)\r
+ {\r
+ NS_LOG(ERROR, "Fail to put message resource");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCRepPayloadSetUri(payload, NS_COLLECTION_MESSAGE_URI);\r
+ OCRepPayloadSetPropInt(payload, NS_ATTRIBUTE_MESSAGE_ID, NS_TOPIC);\r
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID, NSGetProviderInfo()->providerId);\r
\r
- NSTopicList * topicList;\r
+ NSCacheElement * element = NSStorageRead(consumerSubList, consumerId);\r
\r
- if(consumerId == NULL)\r
+ if(element == NULL)\r
{\r
- NS_LOG(DEBUG, "All registered topic list");\r
+ NS_LOG(ERROR, "element is NULL");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ NSCacheSubData * subData = (NSCacheSubData*) element->data;\r
+\r
+ if (OCNotifyListOfObservers(rHandle, (OCObservationId*)&subData->messageObId, 1, payload, OC_HIGH_QOS)\r
+ != OC_STACK_OK)\r
+ {\r
+ NS_LOG(ERROR, "fail to send topic updation");\r
+ OCRepPayloadDestroy(payload);\r
+ return NS_ERROR;\r
+\r
+ }\r
+\r
+ OCRepPayloadDestroy(payload);\r
+\r
+ NS_LOG(DEBUG, "NSSendTopicUpdationToConsumer - OUT");\r
+ return NS_OK;\r
+}\r
+\r
+NSResult NSSendTopicList(OCEntityHandlerRequest * entityHandlerRequest)\r
+{\r
+ NS_LOG(DEBUG, "NSSendTopicList - IN");\r
+\r
+ char * id = NSGetValueFromQuery(OICStrdup(entityHandlerRequest->query), NS_QUERY_CONSUMER_ID);\r
+ NSTopicLL * topics = NULL;\r
+\r
+ if(!id)\r
+ {\r
+ NS_LOG(DEBUG, "Send registered topic list");\r
+ topics = NSProviderGetTopicsCacheData(registeredTopicList);\r
+ }\r
+ else\r
+ {\r
+ NS_LOG(DEBUG, "Send subscribed topic list to consumer");\r
+ topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList, consumerTopicList, id);\r
+ if(!topics)\r
+ {\r
+ topics = NSProviderGetTopicsCacheData(registeredTopicList);\r
+ }\r
+ }\r
+\r
+ // make response for the Get Request\r
+ OCEntityHandlerResponse response;\r
+ response.numSendVendorSpecificHeaderOptions = 0;\r
+ memset(response.sendVendorSpecificHeaderOptions, 0,\r
+ sizeof response.sendVendorSpecificHeaderOptions);\r
+ memset(response.resourceUri, 0, sizeof response.resourceUri);\r
+\r
+ OCRepPayload* payload = OCRepPayloadCreate();\r
+ if (!payload)\r
+ {\r
+ NS_LOG(ERROR, "payload is NULL");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCRepPayloadSetUri(payload, NS_COLLECTION_TOPIC_URI);\r
+ if(id)\r
+ {\r
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, id);\r
+ }\r
+ OCRepPayloadSetPropString(payload, NS_ATTRIBUTE_PROVIDER_ID,\r
+ NSGetProviderInfo()->providerId);\r
+\r
+ if(topics)\r
+ {\r
+ NS_LOG(DEBUG, "topicList is NULL");\r
+ size_t dimensionSize = (size_t)NSProviderGetTopicListSize(topics);\r
+\r
+ NS_LOG_V(DEBUG, "dimensionSize = %d", (int)dimensionSize);\r
+\r
+ if(!dimensionSize)\r
+ {\r
+ return NS_ERROR;\r
+ }\r
+\r
+ OCRepPayload** payloadTopicArray = (OCRepPayload **) OICMalloc(\r
+ sizeof(OCRepPayload *) * dimensionSize);\r
+\r
+ size_t dimensions[3] = {dimensionSize, 0, 0};\r
+\r
+ for (int i = 0; i < (int)dimensionSize; i++)\r
+ {\r
+ NS_LOG_V(DEBUG, "topicName = %s", topics->topicName);\r
+ NS_LOG_V(DEBUG, "topicState = %d",(int) topics->state);\r
+\r
+ payloadTopicArray[i] = OCRepPayloadCreate();\r
+ OCRepPayloadSetPropString(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_NAME,\r
+ topics->topicName);\r
+ OCRepPayloadSetPropInt(payloadTopicArray[i], NS_ATTRIBUTE_TOPIC_SELECTION,\r
+ (int)topics->state);\r
+\r
+ topics = topics->next;\r
+ }\r
+\r
+\r
+ OCRepPayloadSetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST,\r
+ (const OCRepPayload**)(payloadTopicArray), dimensions);\r
}\r
else\r
{\r
- NS_LOG_V(DEBUG, "Subscribed topic list for consumerId(%s)", consumerId);\r
+ size_t dimensions[3] = {0, 0, 0};\r
+\r
+ OCRepPayloadSetPropObjectArrayAsOwner(payload, NS_ATTRIBUTE_TOPIC_LIST,\r
+ (OCRepPayload **) NULL, dimensions);\r
+ }\r
+\r
+ response.requestHandle = entityHandlerRequest->requestHandle;\r
+ response.resourceHandle = entityHandlerRequest->resource;\r
+ response.persistentBufferFlag = 0;\r
+ response.ehResult = OC_EH_OK;\r
+ response.payload = (OCPayload *) payload;\r
+\r
+ if (OCDoResponse(&response) != OC_STACK_OK)\r
+ {\r
+ NS_LOG(ERROR, "Fail to response topic list");\r
+ return NS_ERROR;\r
+ }\r
+ OCRepPayloadDestroy(payload);\r
+\r
+ NS_LOG(DEBUG, "NSSendTopicList - OUT");\r
+ return NS_OK;\r
+}\r
+\r
+NSResult NSPostConsumerTopics(OCEntityHandlerRequest * entityHandlerRequest)\r
+{\r
+ NS_LOG(DEBUG, "NSPostConsumerTopics() - IN");\r
+\r
+ char * consumerId = NULL;\r
+ OCRepPayload * payload = (OCRepPayload *) entityHandlerRequest->payload;\r
+ OCRepPayloadGetPropString(payload, NS_ATTRIBUTE_CONSUMER_ID, &consumerId);\r
+\r
+ if(!consumerId)\r
+ {\r
+ NS_LOG(DEBUG, "Invalid consumer ID");\r
+ return NS_ERROR;\r
+ }\r
+\r
+ NS_LOG_V(DEBUG, "TOPIC consumer ID = %s", consumerId);\r
+\r
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_CID;\r
+ while(NSStorageDelete(consumerTopicList, consumerId) != NS_FAIL);\r
+ consumerTopicList->cacheType = NS_PROVIDER_CACHE_CONSUMER_TOPIC_NAME;\r
+\r
+ OCRepPayload ** topicListPayload = NULL;\r
+ OCRepPayloadValue * payloadValue = NULL;\r
+ payloadValue = NSPayloadFindValue(payload, NS_ATTRIBUTE_TOPIC_LIST);\r
+ size_t dimensionSize = calcDimTotal(payloadValue->arr.dimensions);\r
+ size_t dimensions[3] = {dimensionSize, 0, 0};\r
+ OCRepPayloadGetPropObjectArray(payload, NS_ATTRIBUTE_TOPIC_LIST, & topicListPayload, dimensions);\r
+\r
+ for(int i = 0; i <(int)dimensionSize; i++)\r
+ {\r
+ char * topicName = NULL;\r
+ int64_t topicState = 0;\r
+\r
+ OCRepPayloadGetPropString(topicListPayload[i], NS_ATTRIBUTE_TOPIC_NAME, &topicName);\r
+ OCRepPayloadGetPropInt(topicListPayload[i], NS_ATTRIBUTE_TOPIC_SELECTION, &topicState);\r
+ NS_LOG_V(DEBUG, "Topic Name(state): %s(%d)", topicName, (int)topicState);\r
+\r
+ if(NS_TOPIC_SUBSCRIBED == (NSTopicState)topicState)\r
+ {\r
+ NSCacheTopicSubData * topicSubData = (NSCacheTopicSubData *)\r
+ OICMalloc(sizeof(NSCacheTopicSubData));\r
+\r
+ OICStrcpy(topicSubData->id, NS_UUID_STRING_SIZE, consumerId);\r
+ topicSubData->topicName = OICStrdup(topicName);\r
+\r
+ NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
+ newObj->data = (NSCacheData *) topicSubData;\r
+ newObj->next = NULL;\r
+ NSStorageWrite(consumerTopicList, newObj);\r
+ }\r
}\r
\r
- NS_LOG(DEBUG, "NSGetTopics() NS_OK");\r
- return topicList;\r
+ NS_LOG(DEBUG, "NSPostConsumerTopics() - OUT");\r
+ return NS_OK;\r
}\r
\r
void * NSTopicSchedule(void * ptr)\r
{\r
case TASK_SEND_TOPICS:\r
NS_LOG(DEBUG, "CASE TASK_SEND_TOPICS : ");\r
+ NSSendTopicList((OCEntityHandlerRequest*) node->taskData);\r
+ NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);\r
+ break;\r
+ case TASK_SUBSCRIBE_TOPIC:\r
+ NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");\r
+ NSCacheElement * newObj = (NSCacheElement *) OICMalloc(sizeof(NSCacheElement));\r
+ newObj->data = node->taskData;\r
+ newObj->next = NULL;\r
+ NSStorageWrite(consumerTopicList, newObj);\r
+ break;\r
+ case TASK_UNSUBSCRIBE_TOPIC:\r
+ NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC : ");\r
+ NSProviderDeleteConsumerTopic(consumerTopicList,\r
+ (NSCacheTopicSubData *) node->taskData);\r
+ NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPIC AFter: ");\r
+ break;\r
+ case TASK_ADD_TOPIC:\r
+ {\r
+ NS_LOG(DEBUG, "CASE TASK_ADD_TOPIC : ");\r
+ NSAddTopics((const char *) node->taskData);\r
+ }\r
+ break;\r
+ case TASK_DELETE_TOPIC:\r
+ {\r
+ NS_LOG(DEBUG, "CASE_TASK_DELETE_TOPIC : ");\r
+ NSDeleteTopics((const char *) node->taskData);\r
+ }\r
+ break;\r
+ case TASK_POST_TOPIC:\r
+ {\r
+ NS_LOG(DEBUG, "TASK_POST_TOPIC : ");\r
+ NSPostConsumerTopics((OCEntityHandlerRequest*) node->taskData);\r
+ NSFreeOCEntityHandlerRequest((OCEntityHandlerRequest*) node->taskData);\r
+ }\r
break;\r
- case TASK_SUBSCRIBE_TOPICS:\r
- NS_LOG(DEBUG, "CASE TASK_SUBSCRIBE_TOPICS : ");\r
+ case TASK_GET_TOPICS:\r
+ {\r
+ NS_LOG(DEBUG, "TASK_GET_TOPICS : ");\r
+ NSTopicSynchronization * topicData = (NSTopicSynchronization *) node->taskData;\r
+ NSTopicLL * topics = NSProviderGetTopicsCacheData(registeredTopicList);\r
+ topicData->topics = topics;\r
+ pthread_cond_signal(&topicData->condition);\r
+ }\r
break;\r
- case TASK_REGISTER_TOPICS:\r
- NS_LOG(DEBUG, "CASE TASK_REGISTER_TOPICS : ");\r
+ case TAST_GET_CONSUMER_TOPICS:\r
+ {\r
+ NS_LOG(DEBUG, "TASK_GET_CONSUMER_TOPICS : ");\r
+ NSTopicSynchronization * topicData = (NSTopicSynchronization *) node->taskData;\r
+ NSTopicLL * topics = NSProviderGetConsumerTopicsCacheData(registeredTopicList,\r
+ consumerTopicList, topicData->consumerId);\r
+ topicData->topics = topics;\r
+ pthread_cond_signal(&topicData->condition);\r
+ }\r
break;\r
default:\r
break;\r