[IOT-1592] Fix unit test of notification service.
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 #ifdef WITH_MQ
45 #include "NSConsumerMQPlugin.h"
46 #endif
47
48 void * NSConsumerMsgHandleThreadFunc(void * handle);
49
50 void * NSConsumerMsgPushThreadFunc(void * data);
51
52 void NSConsumerTaskProcessing(NSTask * task);
53
54 NSConsumerThread ** NSGetMsgHandleThreadHandle()
55 {
56     static NSConsumerThread * handle = NULL;
57     return & handle;
58 }
59
60 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
61 {
62    *(NSGetMsgHandleThreadHandle()) = handle;
63 }
64
65 NSConsumerQueue ** NSGetMsgHandleQueue()
66 {
67     static NSConsumerQueue * queue = NULL;
68     return & queue;
69 }
70
71 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
72 {
73    *(NSGetMsgHandleQueue()) = queue;
74 }
75
76 NSResult NSConsumerMessageHandlerInit()
77 {
78     NSConsumerThread * handle = NULL;
79     NSConsumerQueue * queue = NULL;
80
81     char * consumerUuid = (char *)OCGetServerInstanceIDString();
82     NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
83
84     NSSetConsumerId(consumerUuid);
85     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
86
87     NS_LOG(DEBUG, "listener init");
88     NSResult ret = NSConsumerListenerInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "system init");
92     ret = NSConsumerSystemInit();
93     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
94
95     NS_LOG(DEBUG, "create queue");
96     queue = NSCreateQueue();
97     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
98     NSSetMsgHandleQueue(queue);
99
100     NS_LOG(DEBUG, "queue thread init");
101     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
102     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
103     NSSetMsgHandleThreadHandle(handle);
104
105     return NS_OK;
106 }
107
108 NSResult NSConsumerPushEvent(NSTask * task)
109 {
110     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
112
113     NSDestroyThreadHandle(thread);
114     NSOICFree(thread);
115
116     return NS_OK;
117 }
118
119 void NSConsumerMessageHandlerExit()
120 {
121
122     NSConsumerListenerTermiate();
123     NSCancelAllSubscription();
124
125
126     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
127     NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
128
129     NSThreadLock(thread);
130     NS_LOG(DEBUG, "Execute remaining task");
131     while (!NSIsQueueEmpty(queue))
132     {
133         NSConsumerQueueObject * obj = NSPopQueue(queue);
134         NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
135
136         if (obj)
137         {
138             NSConsumerTaskProcessing((NSTask *)(obj->data));
139             NSOICFree(obj);
140         }
141     }
142     NSThreadUnlock(thread);
143
144     NSDestroyQueue(queue);
145     NSOICFree(queue);
146     NSSetMsgHandleQueue(NULL);
147
148     NSThreadLock(thread);
149     NSThreadStop(thread);
150     NSSetMsgHandleThreadHandle(NULL);
151     NSThreadUnlock(thread);
152     NSOICFree(thread);
153
154     NSDestroyInternalCachedList();
155 }
156
157 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
158 {
159     NSConsumerQueueObject * obj = NULL;
160
161     NS_LOG(DEBUG, "create thread for consumer message handle");
162     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
163     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
164
165     while (true)
166     {
167         queueHandleThread = *(NSGetMsgHandleThreadHandle());
168         if (NULL == queueHandleThread)
169         {
170             break;
171         }
172
173         NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
174         if (!queue)
175         {
176             usleep(2000);
177             queue = *(NSGetMsgHandleQueue());
178             continue;
179         }
180
181         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
182         {
183             NS_LOG(ERROR, "msg handler thread will be terminated");
184             break;
185         }
186
187         if (NSIsQueueEmpty(queue))
188         {
189             usleep(2000);
190             continue;
191         }
192
193         NSThreadLock(queueHandleThread);
194         NS_LOG(DEBUG, "msg handler working");
195         obj = NSPopQueue(queue);
196
197         if (obj)
198         {
199             NSConsumerTaskProcessing((NSTask *)(obj->data));
200             NSOICFree(obj);
201         }
202
203         NSThreadUnlock(queueHandleThread);
204
205     }
206
207     return NULL;
208 }
209
210 void * NSConsumerMsgPushThreadFunc(void * data)
211 {
212     NSConsumerQueueObject * obj = NULL;
213     NSConsumerQueue * queue = NULL;
214
215     NS_LOG(DEBUG, "get queueThread handle");
216     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
217     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
218
219     NS_LOG(DEBUG, "create queue object");
220     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
221     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
222
223     obj->data = data;
224     obj->next = NULL;
225
226     NSThreadLock(msgHandleThread);
227
228     queue = *(NSGetMsgHandleQueue());
229     if (!queue)
230     {
231         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
232         NSOICFree(data);
233         NSOICFree(obj);
234     }
235     else
236     {
237         NSPushConsumerQueue(queue, obj);
238     }
239
240     NSThreadUnlock(msgHandleThread);
241
242     return NULL;
243 }
244
245 void NSProviderDeletedPostClean(
246         NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
247 {
248     if (task && task->taskData)
249     {
250         if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
251         {
252             NSRemoveProvider((NSProvider *) task->taskData);
253         }
254         else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
255         {
256             NSOICFree(task->taskData);
257         }
258         NSOICFree(task);
259     }
260
261     if (prov1)
262     {
263         NSRemoveProvider_internal(prov1);
264     }
265
266     if (prov2)
267     {
268         NSRemoveProvider_internal(prov2);
269     }
270 }
271
272 void NSConsumerTaskProcessing(NSTask * task)
273 {
274     switch (task->taskType)
275     {
276         case TASK_EVENT_CONNECTED:
277         case TASK_EVENT_CONNECTED_TCP:
278         case TASK_CONSUMER_REQ_DISCOVER:
279         {
280             NSConsumerDiscoveryTaskProcessing(task);
281             break;
282         }
283         case TASK_CONSUMER_REQ_SUBSCRIBE:
284         {
285             NSProvider_internal * prov =
286                     NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
287             NS_VERIFY_NOT_NULL_V(prov);
288             NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
289             NS_VERIFY_NOT_NULL_V(subTask);
290             NSConsumerCommunicationTaskProcessing(subTask);
291
292             NSRemoveProvider((NSProvider *)task->taskData);
293             NSOICFree(task);
294             break;
295         }
296         case TASK_SEND_SYNCINFO:
297         case TASK_CONSUMER_REQ_TOPIC_LIST:
298         case TASK_CONSUMER_SELECT_TOPIC_LIST:
299         {
300             NSConsumerCommunicationTaskProcessing(task);
301             break;
302         }
303         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
304         case TASK_CONSUMER_PROVIDER_DELETED:
305         {
306             NSProvider_internal * data = NULL;
307
308             if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
309             {
310                 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
311                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
312                         data, NSProviderDeletedPostClean(task, NULL, NULL));
313             }
314             else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
315             {
316                 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
317                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
318                         data, NSProviderDeletedPostClean(task, NULL, NULL));
319             }
320
321             NSProvider_internal * data2 = NSCopyProvider_internal(data);
322             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
323                         data2, NSProviderDeletedPostClean(task, data, NULL));
324
325             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
326             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
327                         conTask, NSProviderDeletedPostClean(task, data, data2));
328             NSConsumerCommunicationTaskProcessing(conTask);
329
330             NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
331             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
332                         conTask, NSProviderDeletedPostClean(task, NULL, data2));
333             NSConsumerInternalTaskProcessing(conTask2);
334
335             NSProviderDeletedPostClean(task, NULL, NULL);
336             break;
337         }
338         case TASK_RECV_SYNCINFO:
339         case TASK_CONSUMER_RECV_MESSAGE:
340         case TASK_CONSUMER_SENT_REQ_OBSERVE:
341         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
342         case TASK_MAKE_SYNCINFO:
343         case TASK_CONSUMER_REQ_TOPIC_URI:
344         case TASK_CONSUMER_RECV_TOPIC_LIST:
345         {
346             NSConsumerInternalTaskProcessing(task);
347             break;
348         }
349         case TASK_CONSUMER_PROVIDER_DISCOVERED:
350         {
351             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
352             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
353             {
354                 NSRemoveProvider_internal((void *) task->taskData);
355                 NSOICFree(task);
356             });
357             getTopicTask->nextTask = NULL;
358             getTopicTask->taskData =
359                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
360             getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
361             NSConsumerCommunicationTaskProcessing(getTopicTask);
362             NSConsumerInternalTaskProcessing(task);
363             break;
364         }
365 #ifdef WITH_MQ
366         case TASK_MQ_REQ_SUBSCRIBE:
367         {
368             NSConsumerMQTaskProcessing(task);
369             break;
370         }
371 #endif
372         default:
373         {
374             NS_LOG(ERROR, "Unknown type of task");
375             break;
376         }
377     }
378 }
379
380 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
381 {
382     NS_VERIFY_NOT_NULL(providerId, NULL);
383
384     return NSProviderCacheFind(providerId);
385 }