#include "NSConsumerNetworkEventListener.h"
#include "NSConsumerSystem.h"
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
NSConsumerThread * handle = NULL;
NSConsumerQueue * queue = NULL;
- uint8_t uuid[UUID_SIZE] = {0,};
- char uuidString[UUID_STRING_SIZE] = {0,};
- OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
- randomRet = OCConvertUuidToString(uuid, uuidString);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
+ char * consumerUuid = (char *)OCGetServerInstanceIDString();
+ NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
- NSSetConsumerId(uuidString);
- NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+ NSSetConsumerId(consumerUuid);
+ NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
ret = NSConsumerSystemInit();
NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
- NS_LOG(DEBUG, "queue thread init");
- handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
-
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
NS_VERIFY_NOT_NULL(queue, NS_ERROR);
NSSetMsgHandleQueue(queue);
+ NS_LOG(DEBUG, "queue thread init");
+ handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+ NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+ NSSetMsgHandleThreadHandle(handle);
+
return NS_OK;
}
NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
NS_VERIFY_NOT_NULL(thread, NS_ERROR);
+ NSDestroyThreadHandle(thread);
+ NSOICFree(thread);
+
return NS_OK;
}
NSConsumerListenerTermiate();
NSCancelAllSubscription();
- NSThreadStop(*(NSGetMsgHandleThreadHandle()));
- NSDestroyQueue(*(NSGetMsgHandleQueue()));
+
+ NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+ NSThreadStop(thread);
+ NSSetMsgHandleThreadHandle(NULL);
+
+ NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+ NSDestroyQueue(queue);
NSSetMsgHandleQueue(NULL);
- NSDestroyMessageCacheList();
- NSDestroyProviderCacheList();
+ NSDestroyInternalCachedList();
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
if (obj)
{
NSConsumerTaskProcessing((NSTask *)(obj->data));
+ NSOICFree(obj);
}
NSThreadUnlock(queueHandleThread);
NS_LOG(DEBUG, "get queueThread handle");
NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
- NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
NS_LOG(DEBUG, "create queue object");
obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
- NS_VERIFY_NOT_NULL(obj, NULL);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
obj->data = data;
obj->next = NULL;
}
else
{
- NSPushQueue(queue, obj);
+ NSPushConsumerQueue(queue, obj);
}
NSThreadUnlock(msgHandleThread);
return NULL;
}
+void NSProviderDeletedPostClean(
+ NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
+{
+ if (task && task->taskData)
+ {
+ if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
+ {
+ NSRemoveProvider((NSProvider *) task->taskData);
+ }
+ else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
+ {
+ NSOICFree(task->taskData);
+ }
+ NSOICFree(task);
+ }
+
+ if (prov1)
+ {
+ NSRemoveProvider_internal(prov1);
+ }
+
+ if (prov2)
+ {
+ NSRemoveProvider_internal(prov2);
+ }
+}
+
void NSConsumerTaskProcessing(NSTask * task)
{
switch (task->taskType)
break;
}
case TASK_CONSUMER_REQ_SUBSCRIBE:
+ {
+ NSProvider_internal * prov =
+ NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
+ NS_VERIFY_NOT_NULL_V(prov);
+ NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
+ NS_VERIFY_NOT_NULL_V(subTask);
+ NSConsumerCommunicationTaskProcessing(subTask);
+
+ NSRemoveProvider((NSProvider *)task->taskData);
+ NSOICFree(task);
+ break;
+ }
case TASK_SEND_SYNCINFO:
+ case TASK_CONSUMER_REQ_TOPIC_LIST:
+ case TASK_CONSUMER_SELECT_TOPIC_LIST:
{
NSConsumerCommunicationTaskProcessing(task);
break;
}
case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
+ case TASK_CONSUMER_PROVIDER_DELETED:
{
- NSProvider_internal * data = NSCopyProvider((NSProvider_internal *)task->taskData);
- NS_VERIFY_NOT_NULL_V(data);
+ NSProvider_internal * data = NULL;
+
+ if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
+ {
+ data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+ data, NSProviderDeletedPostClean(task, NULL, NULL));
+ }
+ else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
+ {
+ data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+ data, NSProviderDeletedPostClean(task, NULL, NULL));
+ }
+
+ NSProvider_internal * data2 = NSCopyProvider_internal(data);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+ data2, NSProviderDeletedPostClean(task, data, NULL));
+
NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
- NS_VERIFY_NOT_NULL_V(conTask);
- NSConsumerCommunicationTaskProcessing(task);
- NSConsumerInternalTaskProcessing(conTask);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+ conTask, NSProviderDeletedPostClean(task, data, data2));
+ NSConsumerCommunicationTaskProcessing(conTask);
+
+ NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+ conTask, NSProviderDeletedPostClean(task, NULL, data2));
+ NSConsumerInternalTaskProcessing(conTask2);
+
+ NSProviderDeletedPostClean(task, NULL, NULL);
break;
}
case TASK_RECV_SYNCINFO:
case TASK_CONSUMER_RECV_MESSAGE:
- case TASK_CONSUMER_PROVIDER_DISCOVERED:
- case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
+ case TASK_CONSUMER_SENT_REQ_OBSERVE:
+ case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
case TASK_MAKE_SYNCINFO:
+ case TASK_CONSUMER_REQ_TOPIC_URI:
+ case TASK_CONSUMER_RECV_TOPIC_LIST:
{
NSConsumerInternalTaskProcessing(task);
break;
}
+ case TASK_CONSUMER_PROVIDER_DISCOVERED:
+ {
+ NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
+ NSRemoveProvider_internal((void *) task->taskData));
+ getTopicTask->nextTask = NULL;
+ getTopicTask->taskData =
+ (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
+ getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
+ NSConsumerCommunicationTaskProcessing(getTopicTask);
+ NSConsumerInternalTaskProcessing(task);
+ break;
+ }
+#ifdef WITH_MQ
+ case TASK_MQ_REQ_SUBSCRIBE:
+ {
+ NSConsumerMQTaskProcessing(task);
+ break;
+ }
+#endif
default:
{
NS_LOG(ERROR, "Unknown type of task");
}
}
-NSMessage * NSConsumerFindNSMessage(const char* messageId)
-{
- NS_VERIFY_NOT_NULL(messageId, NULL);
-
- return NSMessageCacheFind(messageId);
-}
-
NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
{
NS_VERIFY_NOT_NULL(providerId, NULL);