#include "NSConsumerNetworkEventListener.h"
#include "NSConsumerSystem.h"
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
NSConsumerThread * handle = NULL;
NSConsumerQueue * queue = NULL;
- uint8_t uuid[UUID_SIZE] = {0,};
- char uuidString[UUID_STRING_SIZE] = {0,};
- OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
- randomRet = OCConvertUuidToString(uuid, uuidString);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
+ char * consumerUuid = (char *)OCGetServerInstanceIDString();
+ NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
- NSSetConsumerId(uuidString);
- NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+ NSSetConsumerId(consumerUuid);
+ NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
ret = NSConsumerSystemInit();
NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
- NS_LOG(DEBUG, "queue thread init");
- handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
-
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
NS_VERIFY_NOT_NULL(queue, NS_ERROR);
NSSetMsgHandleQueue(queue);
+ NS_LOG(DEBUG, "queue thread init");
+ handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+ NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+ NSSetMsgHandleThreadHandle(handle);
+
return NS_OK;
}
NS_VERIFY_NOT_NULL(thread, NS_ERROR);
NSDestroyThreadHandle(thread);
+ NSOICFree(thread);
return NS_OK;
}
NSConsumerListenerTermiate();
NSCancelAllSubscription();
- NSThreadStop(*(NSGetMsgHandleThreadHandle()));
- NSDestroyQueue(*(NSGetMsgHandleQueue()));
+
+ NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+ NSThreadStop(thread);
+ NSSetMsgHandleThreadHandle(NULL);
+
+ NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+ NSDestroyQueue(queue);
NSSetMsgHandleQueue(NULL);
- NSDestroyMessageCacheList();
- NSDestroyProviderCacheList();
+ NSDestroyInternalCachedList();
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
if (obj)
{
NSConsumerTaskProcessing((NSTask *)(obj->data));
+ NSOICFree(obj);
}
NSThreadUnlock(queueHandleThread);
void * NSConsumerMsgPushThreadFunc(void * data)
{
- NSThreadDetach();
-
NSConsumerQueueObject * obj = NULL;
NSConsumerQueue * queue = NULL;
}
else
{
- NSPushQueue(queue, obj);
+ NSPushConsumerQueue(queue, obj);
}
NSThreadUnlock(msgHandleThread);
}
case TASK_SEND_SYNCINFO:
case TASK_CONSUMER_REQ_TOPIC_LIST:
- {
- NSConsumerCommunicationTaskProcessing(task);
- break;
- }
- case TASK_CONSUMER_GET_TOPIC_LIST:
case TASK_CONSUMER_SELECT_TOPIC_LIST:
{
- NSProvider * provider = (NSProvider *)task->taskData;
- NSProvider_internal * prov =
- NSConsumerFindNSProvider(provider->providerId);
- NS_VERIFY_NOT_NULL_V(prov);
- if (task->taskType == TASK_CONSUMER_SELECT_TOPIC_LIST)
- {
- NSRemoveTopicLL(prov->topicLL);
- prov->topicLL = NSCopyTopicLL((NSTopicLL *)provider->topicLL);
- }
-
- NSTask * topTask = NSMakeTask(task->taskType, prov);
- NS_VERIFY_NOT_NULL_V(topTask);
- NSConsumerCommunicationTaskProcessing(topTask);
-
- NSRemoveProvider((NSProvider *)task->taskData);
- NSOICFree(task);
+ NSConsumerCommunicationTaskProcessing(task);
break;
}
case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
}
case TASK_RECV_SYNCINFO:
case TASK_CONSUMER_RECV_MESSAGE:
- case TASK_CONSUMER_PROVIDER_DISCOVERED:
case TASK_CONSUMER_SENT_REQ_OBSERVE:
case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
case TASK_MAKE_SYNCINFO:
NSConsumerInternalTaskProcessing(task);
break;
}
+ case TASK_CONSUMER_PROVIDER_DISCOVERED:
+ {
+ NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
+ NSRemoveProvider_internal((void *) task->taskData));
+ getTopicTask->nextTask = NULL;
+ getTopicTask->taskData =
+ (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
+ getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
+ NSConsumerCommunicationTaskProcessing(getTopicTask);
+ NSConsumerInternalTaskProcessing(task);
+ break;
+ }
+#ifdef WITH_MQ
+ case TASK_MQ_REQ_SUBSCRIBE:
+ {
+ NSConsumerMQTaskProcessing(task);
+ break;
+ }
+#endif
default:
{
NS_LOG(ERROR, "Unknown type of task");
}
}
-NSMessage * NSConsumerFindNSMessage(const char* messageId)
-{
- NS_VERIFY_NOT_NULL(messageId, NULL);
-
- return NSMessageCacheFind(messageId);
-}
-
NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
{
NS_VERIFY_NOT_NULL(providerId, NULL);