void NSConsumerTaskProcessing(NSTask * task);
-NSConsumerThread ** NSGetMsgHandleThreadHandle()
-{
- static NSConsumerThread * handle = NULL;
- return & handle;
-}
-
-void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
-{
- *(NSGetMsgHandleThreadHandle()) = handle;
-}
+static NSConsumerThread * g_handle = NULL;
-NSConsumerQueue ** NSGetMsgHandleQueue()
-{
- static NSConsumerQueue * queue = NULL;
- return & queue;
-}
+static pthread_mutex_t g_start_mutex = PTHREAD_MUTEX_INITIALIZER;
-void NSSetMsgHandleQueue(NSConsumerQueue * queue)
-{
- *(NSGetMsgHandleQueue()) = queue;
-}
+static NSConsumerQueue * g_queue = NULL;
NSResult NSConsumerMessageHandlerInit()
{
+ pthread_mutex_lock(&g_start_mutex);
+
NSConsumerThread * handle = NULL;
NSConsumerQueue * queue = NULL;
char * consumerUuid = (char *)OCGetServerInstanceIDString();
- NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(consumerUuid, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NSSetConsumerId(consumerUuid);
NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
NS_LOG(DEBUG, "listener init");
NSResult ret = NSConsumerListenerInit();
- NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NS_LOG(DEBUG, "system init");
ret = NSConsumerSystemInit();
- NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
- NS_VERIFY_NOT_NULL(queue, NS_ERROR);
- NSSetMsgHandleQueue(queue);
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(queue, NS_ERROR,
+ pthread_mutex_unlock(&g_start_mutex));
+ g_queue = queue;
NS_LOG(DEBUG, "queue thread init");
handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
- NS_VERIFY_NOT_NULL(handle, NS_ERROR);
- NSSetMsgHandleThreadHandle(handle);
+ if (!handle)
+ {
+ pthread_mutex_unlock(&g_start_mutex);
+ return NS_ERROR;
+ }
+ g_handle = handle;
+ pthread_mutex_unlock(&g_start_mutex);
return NS_OK;
}
void NSConsumerMessageHandlerExit()
{
+ pthread_mutex_lock(&g_start_mutex);
NSConsumerListenerTermiate();
NSCancelAllSubscription();
-
- NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
- NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
-
- NSThreadLock(thread);
+ NSThreadLock(g_handle);
NS_LOG(DEBUG, "Execute remaining task");
- while (!NSIsQueueEmpty(queue))
+ while (!NSIsQueueEmpty(g_queue))
{
- NSConsumerQueueObject * obj = NSPopQueue(queue);
+ NSConsumerQueueObject * obj = NSPopQueue(g_queue);
NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
if (obj)
NSOICFree(obj);
}
}
- NSThreadUnlock(thread);
- NSDestroyQueue(queue);
- NSOICFree(queue);
- NSSetMsgHandleQueue(NULL);
+ NSDestroyQueue(g_queue);
+ NSOICFree(g_queue);
+ g_queue = NULL;
- NSThreadLock(thread);
- NSThreadStop(thread);
- NSSetMsgHandleThreadHandle(NULL);
- NSThreadUnlock(thread);
- NSOICFree(thread);
+ NSThreadUnlock(g_handle);
+ NSThreadStop(g_handle);
+ NSOICFree(g_handle);
+ g_handle = NULL;
NSDestroyInternalCachedList();
+ pthread_mutex_unlock(&g_start_mutex);
}
void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
{
+ (void) threadHandle;
NSConsumerQueueObject * obj = NULL;
NS_LOG(DEBUG, "create thread for consumer message handle");
- NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
- NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
+ NS_VERIFY_NOT_NULL(g_handle, NULL);
while (true)
{
- queueHandleThread = *(NSGetMsgHandleThreadHandle());
- if (NULL == queueHandleThread)
+ pthread_mutex_lock(&g_start_mutex);
+ if (NULL == g_handle)
{
+ pthread_mutex_unlock(&g_start_mutex);
break;
}
- NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
- if (!queue)
+ NSThreadLock(g_handle);
+ if (!g_queue)
{
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
usleep(2000);
- queue = *(NSGetMsgHandleQueue());
continue;
}
- if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
+ if (!g_handle->isStarted && NSIsQueueEmpty(g_queue))
{
NS_LOG(ERROR, "msg handler thread will be terminated");
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
break;
}
- if (NSIsQueueEmpty(queue))
+ if (NSIsQueueEmpty(g_queue))
{
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
usleep(2000);
continue;
}
- NSThreadLock(queueHandleThread);
NS_LOG(DEBUG, "msg handler working");
- queue = *(NSGetMsgHandleQueue());
- obj = NSPopQueue(queue);
+ obj = NSPopQueue(g_queue);
if (obj)
{
NSOICFree(obj);
}
- NSThreadUnlock(queueHandleThread);
+ NSThreadUnlock(g_handle);
+ pthread_mutex_unlock(&g_start_mutex);
}
return NULL;
}
+
void * NSConsumerMsgPushThreadFunc(void * data)
{
NSConsumerQueueObject * obj = NULL;
- NSConsumerQueue * queue = NULL;
NS_LOG(DEBUG, "get queueThread handle");
- NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
- NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
+ if (NULL == g_handle)
+ {
+ NSOICFree(data);
+ return NULL;
+ }
+ NSThreadLock(g_handle);
NS_LOG(DEBUG, "create queue object");
obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
- NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
+ NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL,
+ {
+ NSThreadUnlock(g_handle);
+ NSOICFree(data);
+ });
obj->data = data;
obj->next = NULL;
- NSThreadLock(msgHandleThread);
- queue = *(NSGetMsgHandleQueue());
- if (!queue)
+ if (!g_queue)
{
NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
NSOICFree(data);
}
else
{
- NSPushConsumerQueue(queue, obj);
+ NSPushConsumerQueue(g_queue, obj);
}
- NSThreadUnlock(msgHandleThread);
+ NSThreadUnlock(g_handle);
return NULL;
}