Merge notification service from master branch
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
45
46 void * NSConsumerMsgPushThreadFunc(void * data);
47
48 void NSConsumerTaskProcessing(NSTask * task);
49
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
51 {
52     static NSConsumerThread * handle = NULL;
53     return & handle;
54 }
55
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
57 {
58    *(NSGetMsgHandleThreadHandle()) = handle;
59 }
60
61 NSConsumerQueue ** NSGetMsgHandleQueue()
62 {
63     static NSConsumerQueue * queue = NULL;
64     return & queue;
65 }
66
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
68 {
69    *(NSGetMsgHandleQueue()) = queue;
70 }
71
72 NSResult NSConsumerMessageHandlerInit()
73 {
74     NSConsumerThread * handle = NULL;
75     NSConsumerQueue * queue = NULL;
76
77     char * consumerUuid = (char *)OCGetServerInstanceIDString();
78     NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
79
80     NSSetConsumerId(consumerUuid);
81     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
82
83     NS_LOG(DEBUG, "listener init");
84     NSResult ret = NSConsumerListenerInit();
85     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
86
87     NS_LOG(DEBUG, "system init");
88     ret = NSConsumerSystemInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "queue thread init");
92     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
93     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
94     NSSetMsgHandleThreadHandle(handle);
95
96     NS_LOG(DEBUG, "create queue");
97     queue = NSCreateQueue();
98     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
99     NSSetMsgHandleQueue(queue);
100
101     return NS_OK;
102 }
103
104 NSResult NSConsumerPushEvent(NSTask * task)
105 {
106     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
107     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
108
109     NSDestroyThreadHandle(thread);
110
111     return NS_OK;
112 }
113
114 void NSConsumerMessageHandlerExit()
115 {
116
117     NSConsumerListenerTermiate();
118     NSCancelAllSubscription();
119
120     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
121     NSSetMsgHandleThreadHandle(NULL);
122
123     NSDestroyQueue(*(NSGetMsgHandleQueue()));
124     NSSetMsgHandleQueue(NULL);
125
126     NSDestroyInternalCachedList();
127 }
128
129 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
130 {
131     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
132     NSConsumerQueueObject * obj = NULL;
133
134     NS_LOG(DEBUG, "create thread for consumer message handle");
135     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
136     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
137
138     while (true)
139     {
140         if (!queue)
141         {
142             queue = *(NSGetMsgHandleQueue());
143             usleep(2000);
144             continue;
145         }
146
147         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
148         {
149             NS_LOG(ERROR, "msg handler thread will be terminated");
150             break;
151         }
152
153         if (NSIsQueueEmpty(queue))
154         {
155             usleep(2000);
156             continue;
157         }
158
159         NSThreadLock(queueHandleThread);
160         NS_LOG(DEBUG, "msg handler working");
161         obj = NSPopQueue(queue);
162
163         if (obj)
164         {
165             NSConsumerTaskProcessing((NSTask *)(obj->data));
166         }
167
168         NSThreadUnlock(queueHandleThread);
169
170     }
171
172     return NULL;
173 }
174
175 void * NSConsumerMsgPushThreadFunc(void * data)
176 {
177     NSThreadDetach();
178
179     NSConsumerQueueObject * obj = NULL;
180     NSConsumerQueue * queue = NULL;
181
182     NS_LOG(DEBUG, "get queueThread handle");
183     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
184     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
185
186     NS_LOG(DEBUG, "create queue object");
187     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
188     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
189
190     obj->data = data;
191     obj->next = NULL;
192
193     NSThreadLock(msgHandleThread);
194
195     queue = *(NSGetMsgHandleQueue());
196     if (!queue)
197     {
198         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
199         NSOICFree(data);
200         NSOICFree(obj);
201     }
202     else
203     {
204         NSPushConsumerQueue(queue, obj);
205     }
206
207     NSThreadUnlock(msgHandleThread);
208
209     return NULL;
210 }
211
212 void NSProviderDeletedPostClean(
213         NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
214 {
215     if (task && task->taskData)
216     {
217         if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
218         {
219             NSRemoveProvider((NSProvider *) task->taskData);
220         }
221         else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
222         {
223             NSOICFree(task->taskData);
224         }
225         NSOICFree(task);
226     }
227
228     if (prov1)
229     {
230         NSRemoveProvider_internal(prov1);
231     }
232
233     if (prov2)
234     {
235         NSRemoveProvider_internal(prov2);
236     }
237 }
238
239 void NSConsumerTaskProcessing(NSTask * task)
240 {
241     switch (task->taskType)
242     {
243         case TASK_EVENT_CONNECTED:
244         case TASK_EVENT_CONNECTED_TCP:
245         case TASK_CONSUMER_REQ_DISCOVER:
246         {
247             NSConsumerDiscoveryTaskProcessing(task);
248             break;
249         }
250         case TASK_CONSUMER_REQ_SUBSCRIBE:
251         {
252             NSProvider_internal * prov =
253                     NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
254             NS_VERIFY_NOT_NULL_V(prov);
255             NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
256             NS_VERIFY_NOT_NULL_V(subTask);
257             NSConsumerCommunicationTaskProcessing(subTask);
258
259             NSRemoveProvider((NSProvider *)task->taskData);
260             NSOICFree(task);
261             break;
262         }
263         case TASK_SEND_SYNCINFO:
264         case TASK_CONSUMER_REQ_TOPIC_LIST:
265         case TASK_CONSUMER_SELECT_TOPIC_LIST:
266         {
267             NSConsumerCommunicationTaskProcessing(task);
268             break;
269         }
270         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
271         case TASK_CONSUMER_PROVIDER_DELETED:
272         {
273             NSProvider_internal * data = NULL;
274
275             if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
276             {
277                 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
278                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
279                         data, NSProviderDeletedPostClean(task, NULL, NULL));
280             }
281             else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
282             {
283                 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
284                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
285                         data, NSProviderDeletedPostClean(task, NULL, NULL));
286             }
287
288             NSProvider_internal * data2 = NSCopyProvider_internal(data);
289             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
290                         data2, NSProviderDeletedPostClean(task, data, NULL));
291
292             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
293             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
294                         conTask, NSProviderDeletedPostClean(task, data, data2));
295             NSConsumerCommunicationTaskProcessing(conTask);
296
297             NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
298             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
299                         conTask, NSProviderDeletedPostClean(task, NULL, data2));
300             NSConsumerInternalTaskProcessing(conTask2);
301
302             NSProviderDeletedPostClean(task, NULL, NULL);
303             break;
304         }
305         case TASK_RECV_SYNCINFO:
306         case TASK_CONSUMER_RECV_MESSAGE:
307         case TASK_CONSUMER_SENT_REQ_OBSERVE:
308         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
309         case TASK_MAKE_SYNCINFO:
310         case TASK_CONSUMER_REQ_TOPIC_URI:
311         case TASK_CONSUMER_RECV_TOPIC_LIST:
312         {
313             NSConsumerInternalTaskProcessing(task);
314             break;
315         }
316         case TASK_CONSUMER_PROVIDER_DISCOVERED:
317         {
318             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
319             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
320                         NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
321             getTopicTask->nextTask = NULL;
322             getTopicTask->taskData =
323                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
324             getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
325             NSConsumerCommunicationTaskProcessing(getTopicTask);
326             NSConsumerInternalTaskProcessing(task);
327             break;
328         }
329         default:
330         {
331             NS_LOG(ERROR, "Unknown type of task");
332             break;
333         }
334     }
335 }
336
337 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
338 {
339     NS_VERIFY_NOT_NULL(providerId, NULL);
340
341     return NSProviderCacheFind(providerId);
342 }