Fix to prevent of crash on the unit test.
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
index 13a33bb..c299b31 100644 (file)
 #include "NSConsumerNetworkEventListener.h"
 #include "NSConsumerSystem.h"
 
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
 void * NSConsumerMsgHandleThreadFunc(void * handle);
 
 void * NSConsumerMsgPushThreadFunc(void * data);
@@ -78,7 +82,7 @@ NSResult NSConsumerMessageHandlerInit()
     NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
 
     NSSetConsumerId(consumerUuid);
-    NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+    NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
 
     NS_LOG(DEBUG, "listener init");
     NSResult ret = NSConsumerListenerInit();
@@ -88,16 +92,16 @@ NSResult NSConsumerMessageHandlerInit()
     ret = NSConsumerSystemInit();
     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
 
-    NS_LOG(DEBUG, "queue thread init");
-    handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
-    NS_VERIFY_NOT_NULL(handle, NS_ERROR);
-    NSSetMsgHandleThreadHandle(handle);
-
     NS_LOG(DEBUG, "create queue");
     queue = NSCreateQueue();
     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
     NSSetMsgHandleQueue(queue);
 
+    NS_LOG(DEBUG, "queue thread init");
+    handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+    NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+    NSSetMsgHandleThreadHandle(handle);
+
     return NS_OK;
 }
 
@@ -107,6 +111,7 @@ NSResult NSConsumerPushEvent(NSTask * task)
     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
 
     NSDestroyThreadHandle(thread);
+    NSOICFree(thread);
 
     return NS_OK;
 }
@@ -117,18 +122,40 @@ void NSConsumerMessageHandlerExit()
     NSConsumerListenerTermiate();
     NSCancelAllSubscription();
 
-    NSThreadStop(*(NSGetMsgHandleThreadHandle()));
-    NSSetMsgHandleThreadHandle(NULL);
 
-    NSDestroyQueue(*(NSGetMsgHandleQueue()));
+    NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+    NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+
+    NSThreadLock(thread);
+    NS_LOG(DEBUG, "Execute remaining task");
+    while (!NSIsQueueEmpty(queue))
+    {
+        NSConsumerQueueObject * obj = NSPopQueue(queue);
+        NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
+
+        if (obj)
+        {
+            NSConsumerTaskProcessing((NSTask *)(obj->data));
+            NSOICFree(obj);
+        }
+    }
+    NSThreadUnlock(thread);
+
+    NSDestroyQueue(queue);
+    NSOICFree(queue);
     NSSetMsgHandleQueue(NULL);
 
+    NSThreadLock(thread);
+    NSThreadStop(thread);
+    NSSetMsgHandleThreadHandle(NULL);
+    NSThreadUnlock(thread);
+    NSOICFree(thread);
+
     NSDestroyInternalCachedList();
 }
 
 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
 {
-    NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
     NSConsumerQueueObject * obj = NULL;
 
     NS_LOG(DEBUG, "create thread for consumer message handle");
@@ -137,10 +164,17 @@ void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
 
     while (true)
     {
+        queueHandleThread = *(NSGetMsgHandleThreadHandle());
+        if (NULL == queueHandleThread)
+        {
+            break;
+        }
+
+        NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
         if (!queue)
         {
-            queue = *(NSGetMsgHandleQueue());
             usleep(2000);
+            queue = *(NSGetMsgHandleQueue());
             continue;
         }
 
@@ -158,11 +192,13 @@ void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
 
         NSThreadLock(queueHandleThread);
         NS_LOG(DEBUG, "msg handler working");
+        queue = *(NSGetMsgHandleQueue());
         obj = NSPopQueue(queue);
 
         if (obj)
         {
             NSConsumerTaskProcessing((NSTask *)(obj->data));
+            NSOICFree(obj);
         }
 
         NSThreadUnlock(queueHandleThread);
@@ -174,8 +210,6 @@ void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
 
 void * NSConsumerMsgPushThreadFunc(void * data)
 {
-    NSThreadDetach();
-
     NSConsumerQueueObject * obj = NULL;
     NSConsumerQueue * queue = NULL;
 
@@ -238,6 +272,7 @@ void NSProviderDeletedPostClean(
 
 void NSConsumerTaskProcessing(NSTask * task)
 {
+    NS_VERIFY_NOT_NULL_V(task);
     switch (task->taskType)
     {
         case TASK_EVENT_CONNECTED:
@@ -317,7 +352,10 @@ void NSConsumerTaskProcessing(NSTask * task)
         {
             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
-                        NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
+            {
+                NSRemoveProvider_internal((void *) task->taskData);
+                NSOICFree(task);
+            });
             getTopicTask->nextTask = NULL;
             getTopicTask->taskData =
                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
@@ -326,6 +364,13 @@ void NSConsumerTaskProcessing(NSTask * task)
             NSConsumerInternalTaskProcessing(task);
             break;
         }
+#ifdef WITH_MQ
+        case TASK_MQ_REQ_SUBSCRIBE:
+        {
+            NSConsumerMQTaskProcessing(task);
+            break;
+        }
+#endif
         default:
         {
             NS_LOG(ERROR, "Unknown type of task");