replace : iotivity -> iotivity-sec
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
index 6f4b92d..2e13b59 100644 (file)
 #include "NSConsumerNetworkEventListener.h"
 #include "NSConsumerSystem.h"
 
+#ifdef WITH_MQ
+#include "NSConsumerMQPlugin.h"
+#endif
+
 void * NSConsumerMsgHandleThreadFunc(void * handle);
 
 void * NSConsumerMsgPushThreadFunc(void * data);
@@ -74,15 +78,11 @@ NSResult NSConsumerMessageHandlerInit()
     NSConsumerThread * handle = NULL;
     NSConsumerQueue * queue = NULL;
 
-    uint8_t uuid[UUID_SIZE] = {0,};
-    char uuidString[UUID_STRING_SIZE] = {0,};
-    OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
-    NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
-    randomRet = OCConvertUuidToString(uuid, uuidString);
-    NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
+    char * consumerUuid = (char *)OCGetServerInstanceIDString();
+    NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
 
-    NSSetConsumerId(uuidString);
-    NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
+    NSSetConsumerId(consumerUuid);
+    NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
 
     NS_LOG(DEBUG, "listener init");
     NSResult ret = NSConsumerListenerInit();
@@ -92,16 +92,16 @@ NSResult NSConsumerMessageHandlerInit()
     ret = NSConsumerSystemInit();
     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
 
-    NS_LOG(DEBUG, "queue thread init");
-    handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
-    NS_VERIFY_NOT_NULL(handle, NS_ERROR);
-    NSSetMsgHandleThreadHandle(handle);
-
     NS_LOG(DEBUG, "create queue");
     queue = NSCreateQueue();
     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
     NSSetMsgHandleQueue(queue);
 
+    NS_LOG(DEBUG, "queue thread init");
+    handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
+    NS_VERIFY_NOT_NULL(handle, NS_ERROR);
+    NSSetMsgHandleThreadHandle(handle);
+
     return NS_OK;
 }
 
@@ -111,6 +111,7 @@ NSResult NSConsumerPushEvent(NSTask * task)
     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
 
     NSDestroyThreadHandle(thread);
+    NSOICFree(thread);
 
     return NS_OK;
 }
@@ -120,12 +121,16 @@ void NSConsumerMessageHandlerExit()
 
     NSConsumerListenerTermiate();
     NSCancelAllSubscription();
-    NSThreadStop(*(NSGetMsgHandleThreadHandle()));
-    NSDestroyQueue(*(NSGetMsgHandleQueue()));
+
+    NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
+    NSThreadStop(thread);
+    NSSetMsgHandleThreadHandle(NULL);
+
+    NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
+    NSDestroyQueue(queue);
     NSSetMsgHandleQueue(NULL);
 
-    NSDestroyMessageCacheList();
-    NSDestroyProviderCacheList();
+    NSDestroyInternalCachedList();
 }
 
 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
@@ -165,6 +170,7 @@ void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
         if (obj)
         {
             NSConsumerTaskProcessing((NSTask *)(obj->data));
+            NSOICFree(obj);
         }
 
         NSThreadUnlock(queueHandleThread);
@@ -176,8 +182,6 @@ void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
 
 void * NSConsumerMsgPushThreadFunc(void * data)
 {
-    NSThreadDetach();
-
     NSConsumerQueueObject * obj = NULL;
     NSConsumerQueue * queue = NULL;
 
@@ -203,7 +207,7 @@ void * NSConsumerMsgPushThreadFunc(void * data)
     }
     else
     {
-        NSPushQueue(queue, obj);
+        NSPushConsumerQueue(queue, obj);
     }
 
     NSThreadUnlock(msgHandleThread);
@@ -211,6 +215,33 @@ void * NSConsumerMsgPushThreadFunc(void * data)
     return NULL;
 }
 
+void NSProviderDeletedPostClean(
+        NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
+{
+    if (task && task->taskData)
+    {
+        if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
+        {
+            NSRemoveProvider((NSProvider *) task->taskData);
+        }
+        else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
+        {
+            NSOICFree(task->taskData);
+        }
+        NSOICFree(task);
+    }
+
+    if (prov1)
+    {
+        NSRemoveProvider_internal(prov1);
+    }
+
+    if (prov2)
+    {
+        NSRemoveProvider_internal(prov2);
+    }
+}
+
 void NSConsumerTaskProcessing(NSTask * task)
 {
     switch (task->taskType)
@@ -223,27 +254,63 @@ void NSConsumerTaskProcessing(NSTask * task)
             break;
         }
         case TASK_CONSUMER_REQ_SUBSCRIBE:
+        {
+            NSProvider_internal * prov =
+                    NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
+            NS_VERIFY_NOT_NULL_V(prov);
+            NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
+            NS_VERIFY_NOT_NULL_V(subTask);
+            NSConsumerCommunicationTaskProcessing(subTask);
+
+            NSRemoveProvider((NSProvider *)task->taskData);
+            NSOICFree(task);
+            break;
+        }
         case TASK_SEND_SYNCINFO:
         case TASK_CONSUMER_REQ_TOPIC_LIST:
-        case TASK_CONSUMER_GET_TOPIC_LIST:
         case TASK_CONSUMER_SELECT_TOPIC_LIST:
         {
             NSConsumerCommunicationTaskProcessing(task);
             break;
         }
         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
+        case TASK_CONSUMER_PROVIDER_DELETED:
         {
-            NSProvider_internal * data = NSCopyProvider((NSProvider_internal *)task->taskData);
-            NS_VERIFY_NOT_NULL_V(data);
+            NSProvider_internal * data = NULL;
+
+            if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
+            {
+                data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
+                NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+                        data, NSProviderDeletedPostClean(task, NULL, NULL));
+            }
+            else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
+            {
+                data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
+                NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+                        data, NSProviderDeletedPostClean(task, NULL, NULL));
+            }
+
+            NSProvider_internal * data2 = NSCopyProvider_internal(data);
+            NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+                        data2, NSProviderDeletedPostClean(task, data, NULL));
+
             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
-            NS_VERIFY_NOT_NULL_V(conTask);
-            NSConsumerCommunicationTaskProcessing(task);
-            NSConsumerInternalTaskProcessing(conTask);
+            NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+                        conTask, NSProviderDeletedPostClean(task, data, data2));
+            NSConsumerCommunicationTaskProcessing(conTask);
+
+            NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
+            NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
+                        conTask, NSProviderDeletedPostClean(task, NULL, data2));
+            NSConsumerInternalTaskProcessing(conTask2);
+
+            NSProviderDeletedPostClean(task, NULL, NULL);
             break;
         }
         case TASK_RECV_SYNCINFO:
         case TASK_CONSUMER_RECV_MESSAGE:
-        case TASK_CONSUMER_PROVIDER_DISCOVERED:
+        case TASK_CONSUMER_SENT_REQ_OBSERVE:
         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
         case TASK_MAKE_SYNCINFO:
         case TASK_CONSUMER_REQ_TOPIC_URI:
@@ -252,6 +319,26 @@ void NSConsumerTaskProcessing(NSTask * task)
             NSConsumerInternalTaskProcessing(task);
             break;
         }
+        case TASK_CONSUMER_PROVIDER_DISCOVERED:
+        {
+            NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
+            NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
+                        NSRemoveProvider_internal((void *) task->taskData));
+            getTopicTask->nextTask = NULL;
+            getTopicTask->taskData =
+                    (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
+            getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
+            NSConsumerCommunicationTaskProcessing(getTopicTask);
+            NSConsumerInternalTaskProcessing(task);
+            break;
+        }
+#ifdef WITH_MQ
+        case TASK_MQ_REQ_SUBSCRIBE:
+        {
+            NSConsumerMQTaskProcessing(task);
+            break;
+        }
+#endif
         default:
         {
             NS_LOG(ERROR, "Unknown type of task");
@@ -260,13 +347,6 @@ void NSConsumerTaskProcessing(NSTask * task)
     }
 }
 
-NSMessage * NSConsumerFindNSMessage(const char* messageId)
-{
-    NS_VERIFY_NOT_NULL(messageId, NULL);
-
-    return NSMessageCacheFind(messageId);
-}
-
 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
 {
     NS_VERIFY_NOT_NULL(providerId, NULL);