#include "NSConsumerNetworkEventListener.h"
#include "NSConsumerSystem.h"
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
NSSetConsumerId(consumerUuid);
- NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+ NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
ret = NSConsumerSystemInit();
NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
- NS_LOG(DEBUG, "queue thread init");
- handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
-
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
NS_VERIFY_NOT_NULL(queue, NS_ERROR);
NSSetMsgHandleQueue(queue);
+ NS_LOG(DEBUG, "queue thread init");
+ handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+ NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+ NSSetMsgHandleThreadHandle(handle);
+
return NS_OK;
}
NS_VERIFY_NOT_NULL(thread, NS_ERROR);
NSDestroyThreadHandle(thread);
+ NSOICFree(thread);
return NS_OK;
}
NSConsumerListenerTermiate();
NSCancelAllSubscription();
- NSThreadStop(*(NSGetMsgHandleThreadHandle()));
- NSSetMsgHandleThreadHandle(NULL);
- NSDestroyQueue(*(NSGetMsgHandleQueue()));
+ NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+ NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+
+ NSThreadLock(thread);
+ NS_LOG(DEBUG, "Execute remaining task");
+ while (!NSIsQueueEmpty(queue))
+ {
+ NSConsumerQueueObject * obj = NSPopQueue(queue);
+ NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
+
+ if (obj)
+ {
+ NSConsumerTaskProcessing((NSTask *)(obj->data));
+ NSOICFree(obj);
+ }
+ }
+ NSThreadUnlock(thread);
+
+ NSDestroyQueue(queue);
+ NSOICFree(queue);
NSSetMsgHandleQueue(NULL);
+ NSThreadLock(thread);
+ NSThreadStop(thread);
+ NSSetMsgHandleThreadHandle(NULL);
+ NSThreadUnlock(thread);
+ NSOICFree(thread);
+
NSDestroyInternalCachedList();
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
{
- NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
NSConsumerQueueObject * obj = NULL;
NS_LOG(DEBUG, "create thread for consumer message handle");
while (true)
{
+ queueHandleThread = *(NSGetMsgHandleThreadHandle());
+ if (NULL == queueHandleThread)
+ {
+ break;
+ }
+
+ NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
if (!queue)
{
- queue = *(NSGetMsgHandleQueue());
usleep(2000);
+ queue = *(NSGetMsgHandleQueue());
continue;
}
NSThreadLock(queueHandleThread);
NS_LOG(DEBUG, "msg handler working");
+ queue = *(NSGetMsgHandleQueue());
obj = NSPopQueue(queue);
if (obj)
{
NSConsumerTaskProcessing((NSTask *)(obj->data));
+ NSOICFree(obj);
}
NSThreadUnlock(queueHandleThread);
void * NSConsumerMsgPushThreadFunc(void * data)
{
- NSThreadDetach();
-
NSConsumerQueueObject * obj = NULL;
NSConsumerQueue * queue = NULL;
void NSConsumerTaskProcessing(NSTask * task)
{
+ NS_VERIFY_NOT_NULL_V(task);
switch (task->taskType)
{
case TASK_EVENT_CONNECTED:
{
NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
- NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
+ {
+ NSRemoveProvider_internal((void *) task->taskData);
+ NSOICFree(task);
+ });
getTopicTask->nextTask = NULL;
getTopicTask->taskData =
(void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
NSConsumerInternalTaskProcessing(task);
break;
}
+#ifdef WITH_MQ
+ case TASK_MQ_REQ_SUBSCRIBE:
+ {
+ NSConsumerMQTaskProcessing(task);
+ break;
+ }
+#endif
default:
{
NS_LOG(ERROR, "Unknown type of task");