Fix to prevent of crash on the unit test.
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 #ifdef WITH_MQ
45 #include "NSConsumerMQPlugin.h"
46 #endif
47
48 void * NSConsumerMsgHandleThreadFunc(void * handle);
49
50 void * NSConsumerMsgPushThreadFunc(void * data);
51
52 void NSConsumerTaskProcessing(NSTask * task);
53
54 NSConsumerThread ** NSGetMsgHandleThreadHandle()
55 {
56     static NSConsumerThread * handle = NULL;
57     return & handle;
58 }
59
60 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
61 {
62    *(NSGetMsgHandleThreadHandle()) = handle;
63 }
64
65 NSConsumerQueue ** NSGetMsgHandleQueue()
66 {
67     static NSConsumerQueue * queue = NULL;
68     return & queue;
69 }
70
71 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
72 {
73    *(NSGetMsgHandleQueue()) = queue;
74 }
75
76 NSResult NSConsumerMessageHandlerInit()
77 {
78     NSConsumerThread * handle = NULL;
79     NSConsumerQueue * queue = NULL;
80
81     char * consumerUuid = (char *)OCGetServerInstanceIDString();
82     NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
83
84     NSSetConsumerId(consumerUuid);
85     NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
86
87     NS_LOG(DEBUG, "listener init");
88     NSResult ret = NSConsumerListenerInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "system init");
92     ret = NSConsumerSystemInit();
93     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
94
95     NS_LOG(DEBUG, "create queue");
96     queue = NSCreateQueue();
97     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
98     NSSetMsgHandleQueue(queue);
99
100     NS_LOG(DEBUG, "queue thread init");
101     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
102     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
103     NSSetMsgHandleThreadHandle(handle);
104
105     return NS_OK;
106 }
107
108 NSResult NSConsumerPushEvent(NSTask * task)
109 {
110     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
112
113     NSDestroyThreadHandle(thread);
114     NSOICFree(thread);
115
116     return NS_OK;
117 }
118
119 void NSConsumerMessageHandlerExit()
120 {
121
122     NSConsumerListenerTermiate();
123     NSCancelAllSubscription();
124
125
126     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
127     NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
128
129     NSThreadLock(thread);
130     NS_LOG(DEBUG, "Execute remaining task");
131     while (!NSIsQueueEmpty(queue))
132     {
133         NSConsumerQueueObject * obj = NSPopQueue(queue);
134         NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
135
136         if (obj)
137         {
138             NSConsumerTaskProcessing((NSTask *)(obj->data));
139             NSOICFree(obj);
140         }
141     }
142     NSThreadUnlock(thread);
143
144     NSDestroyQueue(queue);
145     NSOICFree(queue);
146     NSSetMsgHandleQueue(NULL);
147
148     NSThreadLock(thread);
149     NSThreadStop(thread);
150     NSSetMsgHandleThreadHandle(NULL);
151     NSThreadUnlock(thread);
152     NSOICFree(thread);
153
154     NSDestroyInternalCachedList();
155 }
156
157 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
158 {
159     NSConsumerQueueObject * obj = NULL;
160
161     NS_LOG(DEBUG, "create thread for consumer message handle");
162     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
163     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
164
165     while (true)
166     {
167         queueHandleThread = *(NSGetMsgHandleThreadHandle());
168         if (NULL == queueHandleThread)
169         {
170             break;
171         }
172
173         NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
174         if (!queue)
175         {
176             usleep(2000);
177             queue = *(NSGetMsgHandleQueue());
178             continue;
179         }
180
181         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
182         {
183             NS_LOG(ERROR, "msg handler thread will be terminated");
184             break;
185         }
186
187         if (NSIsQueueEmpty(queue))
188         {
189             usleep(2000);
190             continue;
191         }
192
193         NSThreadLock(queueHandleThread);
194         NS_LOG(DEBUG, "msg handler working");
195         queue = *(NSGetMsgHandleQueue());
196         obj = NSPopQueue(queue);
197
198         if (obj)
199         {
200             NSConsumerTaskProcessing((NSTask *)(obj->data));
201             NSOICFree(obj);
202         }
203
204         NSThreadUnlock(queueHandleThread);
205
206     }
207
208     return NULL;
209 }
210
211 void * NSConsumerMsgPushThreadFunc(void * data)
212 {
213     NSConsumerQueueObject * obj = NULL;
214     NSConsumerQueue * queue = NULL;
215
216     NS_LOG(DEBUG, "get queueThread handle");
217     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
218     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
219
220     NS_LOG(DEBUG, "create queue object");
221     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
222     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
223
224     obj->data = data;
225     obj->next = NULL;
226
227     NSThreadLock(msgHandleThread);
228
229     queue = *(NSGetMsgHandleQueue());
230     if (!queue)
231     {
232         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
233         NSOICFree(data);
234         NSOICFree(obj);
235     }
236     else
237     {
238         NSPushConsumerQueue(queue, obj);
239     }
240
241     NSThreadUnlock(msgHandleThread);
242
243     return NULL;
244 }
245
246 void NSProviderDeletedPostClean(
247         NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
248 {
249     if (task && task->taskData)
250     {
251         if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
252         {
253             NSRemoveProvider((NSProvider *) task->taskData);
254         }
255         else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
256         {
257             NSOICFree(task->taskData);
258         }
259         NSOICFree(task);
260     }
261
262     if (prov1)
263     {
264         NSRemoveProvider_internal(prov1);
265     }
266
267     if (prov2)
268     {
269         NSRemoveProvider_internal(prov2);
270     }
271 }
272
273 void NSConsumerTaskProcessing(NSTask * task)
274 {
275     NS_VERIFY_NOT_NULL_V(task);
276     switch (task->taskType)
277     {
278         case TASK_EVENT_CONNECTED:
279         case TASK_EVENT_CONNECTED_TCP:
280         case TASK_CONSUMER_REQ_DISCOVER:
281         {
282             NSConsumerDiscoveryTaskProcessing(task);
283             break;
284         }
285         case TASK_CONSUMER_REQ_SUBSCRIBE:
286         {
287             NSProvider_internal * prov =
288                     NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
289             NS_VERIFY_NOT_NULL_V(prov);
290             NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
291             NS_VERIFY_NOT_NULL_V(subTask);
292             NSConsumerCommunicationTaskProcessing(subTask);
293
294             NSRemoveProvider((NSProvider *)task->taskData);
295             NSOICFree(task);
296             break;
297         }
298         case TASK_SEND_SYNCINFO:
299         case TASK_CONSUMER_REQ_TOPIC_LIST:
300         case TASK_CONSUMER_SELECT_TOPIC_LIST:
301         {
302             NSConsumerCommunicationTaskProcessing(task);
303             break;
304         }
305         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
306         case TASK_CONSUMER_PROVIDER_DELETED:
307         {
308             NSProvider_internal * data = NULL;
309
310             if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
311             {
312                 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
313                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
314                         data, NSProviderDeletedPostClean(task, NULL, NULL));
315             }
316             else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
317             {
318                 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
319                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
320                         data, NSProviderDeletedPostClean(task, NULL, NULL));
321             }
322
323             NSProvider_internal * data2 = NSCopyProvider_internal(data);
324             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
325                         data2, NSProviderDeletedPostClean(task, data, NULL));
326
327             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
328             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
329                         conTask, NSProviderDeletedPostClean(task, data, data2));
330             NSConsumerCommunicationTaskProcessing(conTask);
331
332             NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
333             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
334                         conTask, NSProviderDeletedPostClean(task, NULL, data2));
335             NSConsumerInternalTaskProcessing(conTask2);
336
337             NSProviderDeletedPostClean(task, NULL, NULL);
338             break;
339         }
340         case TASK_RECV_SYNCINFO:
341         case TASK_CONSUMER_RECV_MESSAGE:
342         case TASK_CONSUMER_SENT_REQ_OBSERVE:
343         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
344         case TASK_MAKE_SYNCINFO:
345         case TASK_CONSUMER_REQ_TOPIC_URI:
346         case TASK_CONSUMER_RECV_TOPIC_LIST:
347         {
348             NSConsumerInternalTaskProcessing(task);
349             break;
350         }
351         case TASK_CONSUMER_PROVIDER_DISCOVERED:
352         {
353             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
354             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
355             {
356                 NSRemoveProvider_internal((void *) task->taskData);
357                 NSOICFree(task);
358             });
359             getTopicTask->nextTask = NULL;
360             getTopicTask->taskData =
361                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
362             getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
363             NSConsumerCommunicationTaskProcessing(getTopicTask);
364             NSConsumerInternalTaskProcessing(task);
365             break;
366         }
367 #ifdef WITH_MQ
368         case TASK_MQ_REQ_SUBSCRIBE:
369         {
370             NSConsumerMQTaskProcessing(task);
371             break;
372         }
373 #endif
374         default:
375         {
376             NS_LOG(ERROR, "Unknown type of task");
377             break;
378         }
379     }
380 }
381
382 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
383 {
384     NS_VERIFY_NOT_NULL(providerId, NULL);
385
386     return NSProviderCacheFind(providerId);
387 }