#include "NSConsumerMQPlugin.h"
#endif
-pthread_mutex_t NSConsumerQueueMutex;
-
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
void NSConsumerTaskProcessing(NSTask * task);
-NSConsumerThread ** NSGetMsgHandleThreadHandle()
-{
- static NSConsumerThread * handle = NULL;
- return & handle;
-}
+static NSConsumerThread * g_handle = NULL;
-void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
-{
- *(NSGetMsgHandleThreadHandle()) = handle;
-}
+static pthread_mutex_t g_start_mutex = PTHREAD_MUTEX_INITIALIZER;
-NSConsumerQueue ** NSGetMsgHandleQueue()
-{
- static NSConsumerQueue * queue = NULL;
- return & queue;
-}
+static NSConsumerQueue * g_queue = NULL;
-void NSSetMsgHandleQueue(NSConsumerQueue * queue)
+NSResult NSConsumerMessageHandlerInit(void)
{
- *(NSGetMsgHandleQueue()) = queue;
-}
+ pthread_mutex_lock(&g_start_mutex);
-NSResult NSConsumerMessageHandlerInit()
-{
NSConsumerThread * handle = NULL;
NSConsumerQueue * queue = NULL;
char * consumerUuid = (char *)OCGetServerInstanceIDString();
- NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(consumerUuid, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NSSetConsumerId(consumerUuid);
NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
- NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NS_LOG(DEBUG, "system init");
ret = NSConsumerSystemInit();
- NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
-
- NS_LOG(DEBUG, "mutex init");
- pthread_mutex_init(&NSConsumerQueueMutex, NULL);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
- NS_VERIFY_NOT_NULL(queue, NS_ERROR);
- NSSetMsgHandleQueue(queue);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(queue, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
+ g_queue = queue;
NS_LOG(DEBUG, "queue thread init");
handle = NSJoinableThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
+ if (!handle)
+ {
+ pthread_mutex_unlock(&g_start_mutex);
+ return NS_ERROR;
+ }
+ g_handle = handle;
+ pthread_mutex_unlock(&g_start_mutex);
return NS_OK;
}
NSConsumerListenerTermiate();
NSCancelAllSubscription();
- pthread_mutex_lock(&NSConsumerQueueMutex);
- NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
- NSThreadStop(thread);
- NSSetMsgHandleThreadHandle(NULL);
+ NSThreadStop(g_handle);
+ NSOICFree(g_handle);
+ g_handle = NULL;
- NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
- NSDestroyQueue(queue);
- NSSetMsgHandleQueue(NULL);
- pthread_mutex_unlock(&NSConsumerQueueMutex);
- pthread_mutex_destroy(&NSConsumerQueueMutex);
+ NSDestroyQueue(g_queue);
+ NSOICFree(g_queue);
+ g_queue = NULL;
NSDestroyInternalCachedList();
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
{
- NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
+ (void) threadHandle;
NSConsumerQueueObject * obj = NULL;
NS_LOG(DEBUG, "create thread for consumer message handle");
- pthread_mutex_lock(&NSConsumerQueueMutex);
- NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
- pthread_mutex_unlock(&NSConsumerQueueMutex);
- NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
+ NS_VERIFY_NOT_NULL(g_handle, NULL);
while (true)
{
- if (!queue)
+ pthread_mutex_lock(&g_start_mutex);
+ if (NULL == g_handle)
{
- queue = *(NSGetMsgHandleQueue());
+ pthread_mutex_unlock(&g_start_mutex);
+ break;
+ }
+
+ NSThreadLock(g_handle);
+ if (!g_queue)
+ {
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
usleep(2000);
continue;
}
- pthread_mutex_lock(&NSConsumerQueueMutex);
- if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
+ if (!g_handle->isStarted && NSIsQueueEmpty(g_queue))
{
- pthread_mutex_unlock(&NSConsumerQueueMutex);
NS_LOG(ERROR, "msg handler thread will be terminated");
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
break;
}
- if (NSIsQueueEmpty(queue))
+ if (NSIsQueueEmpty(g_queue))
{
- pthread_mutex_unlock(&NSConsumerQueueMutex);
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
usleep(2000);
continue;
}
- NSThreadLock(queueHandleThread);
+
NS_LOG(DEBUG, "msg handler working");
- obj = NSPopQueue(queue);
+ obj = NSPopQueue(g_queue);
if (obj)
{
NSOICFree(obj);
}
- NSThreadUnlock(queueHandleThread);
- pthread_mutex_unlock(&NSConsumerQueueMutex);
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
+
}
return NULL;
}
+
void * NSConsumerMsgPushThreadFunc(void * data)
{
+ pthread_mutex_lock(&g_start_mutex);
NSConsumerQueueObject * obj = NULL;
- NSConsumerQueue * queue = NULL;
NS_LOG(DEBUG, "get queueThread handle");
- pthread_mutex_lock(&NSConsumerQueueMutex);
- NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
- NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
+ if (NULL == g_handle)
+ {
+ NSOICFree(data);
+ pthread_mutex_unlock(&g_start_mutex);
+ return NULL;
+ }
+ NSThreadLock(g_handle);
NS_LOG(DEBUG, "create queue object");
obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
- NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL,
+ {
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
+ NSOICFree(data);
+ });
obj->data = data;
obj->next = NULL;
- NSThreadLock(msgHandleThread);
- queue = *(NSGetMsgHandleQueue());
- if (!queue)
+ if (!g_queue)
{
NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
NSOICFree(data);
}
else
{
- if (msgHandleThread != NULL && msgHandleThread->isStarted)
- {
- NSPushConsumerQueue(queue, obj);
- }
- else
- {
- NSOICFree(data);
- NSOICFree(obj);
- }
+ NSPushConsumerQueue(g_queue, obj);
}
- NSThreadUnlock(msgHandleThread);
- pthread_mutex_unlock(&NSConsumerQueueMutex);
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
return NULL;
}
void NSConsumerTaskProcessing(NSTask * task)
{
+ NS_VERIFY_NOT_NULL_V(task);
switch (task->taskType)
{
case TASK_EVENT_CONNECTED:
{
NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
- NSRemoveProvider_internal((void *) task->taskData));
+ {
+ NSRemoveProvider_internal((void *) task->taskData);
+ NSOICFree(task);
+ });
getTopicTask->nextTask = NULL;
getTopicTask->taskData =
(void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);