#include "NSConsumerMQPlugin.h"
#endif
+pthread_mutex_t NSConsumerQueueMutex;
+
void * NSConsumerMsgHandleThreadFunc(void * handle);
void * NSConsumerMsgPushThreadFunc(void * data);
ret = NSConsumerSystemInit();
NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
+ NS_LOG(DEBUG, "mutex init");
+ pthread_mutex_init(&NSConsumerQueueMutex, NULL);
+
NS_LOG(DEBUG, "create queue");
queue = NSCreateQueue();
NS_VERIFY_NOT_NULL(queue, NS_ERROR);
void NSConsumerMessageHandlerExit()
{
-
NSConsumerListenerTermiate();
NSCancelAllSubscription();
+ pthread_mutex_lock(&NSConsumerQueueMutex);
NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
NSThreadStop(thread);
NSSetMsgHandleThreadHandle(NULL);
NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
NSDestroyQueue(queue);
NSSetMsgHandleQueue(NULL);
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
+ pthread_mutex_destroy(&NSConsumerQueueMutex);
NSDestroyInternalCachedList();
}
NSConsumerQueueObject * obj = NULL;
NS_LOG(DEBUG, "create thread for consumer message handle");
+ pthread_mutex_lock(&NSConsumerQueueMutex);
NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
while (true)
continue;
}
+ pthread_mutex_lock(&NSConsumerQueueMutex);
if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
{
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
NS_LOG(ERROR, "msg handler thread will be terminated");
break;
}
if (NSIsQueueEmpty(queue))
{
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
usleep(2000);
continue;
}
-
NSThreadLock(queueHandleThread);
NS_LOG(DEBUG, "msg handler working");
obj = NSPopQueue(queue);
}
NSThreadUnlock(queueHandleThread);
-
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
}
return NULL;
NSConsumerQueue * queue = NULL;
NS_LOG(DEBUG, "get queueThread handle");
+ pthread_mutex_lock(&NSConsumerQueueMutex);
NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
}
NSThreadUnlock(msgHandleThread);
+ pthread_mutex_unlock(&NSConsumerQueueMutex);
return NULL;
}