#include "NSConsumerNetworkEventListener.h"
#include "NSConsumerSystem.h"
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
NSConsumerThread * handle = NULL;
NSConsumerQueue * queue = NULL;
- uint8_t uuid[UUID_SIZE] = {0,};
- char uuidString[UUID_STRING_SIZE] = {0,};
- OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
- randomRet = OCConvertUuidToString(uuid, uuidString);
- NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
+ char * consumerUuid = (char *)OCGetServerInstanceIDString();
+ NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
- NSSetConsumerId(uuidString);
- NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+ NSSetConsumerId(consumerUuid);
+ NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
ret = NSConsumerSystemInit();
NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
- NS_LOG(DEBUG, "queue thread init");
- handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
-
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
NS_VERIFY_NOT_NULL(queue, NS_ERROR);
NSSetMsgHandleQueue(queue);
+ NS_LOG(DEBUG, "queue thread init");
+ handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+ NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+ NSSetMsgHandleThreadHandle(handle);
+
return NS_OK;
}
NS_VERIFY_NOT_NULL(thread, NS_ERROR);
NSDestroyThreadHandle(thread);
+ NSOICFree(thread);
return NS_OK;
}
NSConsumerListenerTermiate();
NSCancelAllSubscription();
- NSThreadStop(*(NSGetMsgHandleThreadHandle()));
+ NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+ NSThreadStop(thread);
NSSetMsgHandleThreadHandle(NULL);
- NSDestroyQueue(*(NSGetMsgHandleQueue()));
+ NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+ NSDestroyQueue(queue);
NSSetMsgHandleQueue(NULL);
NSDestroyInternalCachedList();
if (obj)
{
NSConsumerTaskProcessing((NSTask *)(obj->data));
+ NSOICFree(obj);
}
NSThreadUnlock(queueHandleThread);
void * NSConsumerMsgPushThreadFunc(void * data)
{
- NSThreadDetach();
-
NSConsumerQueueObject * obj = NULL;
NSConsumerQueue * queue = NULL;
}
else
{
- NSPushQueue(queue, obj);
+ NSPushConsumerQueue(queue, obj);
}
NSThreadUnlock(msgHandleThread);
{
NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
- NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
+ NSRemoveProvider_internal((void *) task->taskData));
getTopicTask->nextTask = NULL;
getTopicTask->taskData =
(void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
NSConsumerInternalTaskProcessing(task);
break;
}
+#ifdef WITH_MQ
+ case TASK_MQ_REQ_SUBSCRIBE:
+ {
+ NSConsumerMQTaskProcessing(task);
+ break;
+ }
+#endif
default:
{
NS_LOG(ERROR, "Unknown type of task");