Merge "Merge remote-tracking branch 'origin/notification-service' Updated with static...
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
45
46 void * NSConsumerMsgPushThreadFunc(void * data);
47
48 void NSConsumerTaskProcessing(NSTask * task);
49
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
51 {
52     static NSConsumerThread * handle = NULL;
53     return & handle;
54 }
55
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
57 {
58    *(NSGetMsgHandleThreadHandle()) = handle;
59 }
60
61 NSConsumerQueue ** NSGetMsgHandleQueue()
62 {
63     static NSConsumerQueue * queue = NULL;
64     return & queue;
65 }
66
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
68 {
69    *(NSGetMsgHandleQueue()) = queue;
70 }
71
72 NSResult NSConsumerMessageHandlerInit()
73 {
74     NSConsumerThread * handle = NULL;
75     NSConsumerQueue * queue = NULL;
76
77     uint8_t uuid[UUID_SIZE] = {0,};
78     char uuidString[UUID_STRING_SIZE] = {0,};
79     OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
80     NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
81     randomRet = OCConvertUuidToString(uuid, uuidString);
82     NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
83
84     NSSetConsumerId(uuidString);
85     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
86
87     NS_LOG(DEBUG, "listener init");
88     NSResult ret = NSConsumerListenerInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "system init");
92     ret = NSConsumerSystemInit();
93     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
94
95     NS_LOG(DEBUG, "queue thread init");
96     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
97     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
98     NSSetMsgHandleThreadHandle(handle);
99
100     NS_LOG(DEBUG, "create queue");
101     queue = NSCreateQueue();
102     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
103     NSSetMsgHandleQueue(queue);
104
105     return NS_OK;
106 }
107
108 NSResult NSConsumerPushEvent(NSTask * task)
109 {
110     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
112
113     NSDestroyThreadHandle(thread);
114
115     return NS_OK;
116 }
117
118 void NSConsumerMessageHandlerExit()
119 {
120
121     NSConsumerListenerTermiate();
122     NSCancelAllSubscription();
123
124     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
125     NSSetMsgHandleThreadHandle(NULL);
126
127     NSDestroyQueue(*(NSGetMsgHandleQueue()));
128     NSSetMsgHandleQueue(NULL);
129
130     NSDestroyInternalCachedList();
131 }
132
133 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
134 {
135     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
136     NSConsumerQueueObject * obj = NULL;
137
138     NS_LOG(DEBUG, "create thread for consumer message handle");
139     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
140     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
141
142     while (true)
143     {
144         if (!queue)
145         {
146             queue = *(NSGetMsgHandleQueue());
147             usleep(2000);
148             continue;
149         }
150
151         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
152         {
153             NS_LOG(ERROR, "msg handler thread will be terminated");
154             break;
155         }
156
157         if (NSIsQueueEmpty(queue))
158         {
159             usleep(2000);
160             continue;
161         }
162
163         NSThreadLock(queueHandleThread);
164         NS_LOG(DEBUG, "msg handler working");
165         obj = NSPopQueue(queue);
166
167         if (obj)
168         {
169             NSConsumerTaskProcessing((NSTask *)(obj->data));
170         }
171
172         NSThreadUnlock(queueHandleThread);
173
174     }
175
176     return NULL;
177 }
178
179 void * NSConsumerMsgPushThreadFunc(void * data)
180 {
181     NSThreadDetach();
182
183     NSConsumerQueueObject * obj = NULL;
184     NSConsumerQueue * queue = NULL;
185
186     NS_LOG(DEBUG, "get queueThread handle");
187     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
188     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
189
190     NS_LOG(DEBUG, "create queue object");
191     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
192     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
193
194     obj->data = data;
195     obj->next = NULL;
196
197     NSThreadLock(msgHandleThread);
198
199     queue = *(NSGetMsgHandleQueue());
200     if (!queue)
201     {
202         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
203         NSOICFree(data);
204         NSOICFree(obj);
205     }
206     else
207     {
208         NSPushQueue(queue, obj);
209     }
210
211     NSThreadUnlock(msgHandleThread);
212
213     return NULL;
214 }
215
216 void NSProviderDeletedPostClean(
217         NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
218 {
219     if (task && task->taskData)
220     {
221         if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
222         {
223             NSRemoveProvider((NSProvider *) task->taskData);
224         }
225         else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
226         {
227             NSOICFree(task->taskData);
228         }
229         NSOICFree(task);
230     }
231
232     if (prov1)
233     {
234         NSRemoveProvider_internal(prov1);
235     }
236
237     if (prov2)
238     {
239         NSRemoveProvider_internal(prov2);
240     }
241 }
242
243 void NSConsumerTaskProcessing(NSTask * task)
244 {
245     switch (task->taskType)
246     {
247         case TASK_EVENT_CONNECTED:
248         case TASK_EVENT_CONNECTED_TCP:
249         case TASK_CONSUMER_REQ_DISCOVER:
250         {
251             NSConsumerDiscoveryTaskProcessing(task);
252             break;
253         }
254         case TASK_CONSUMER_REQ_SUBSCRIBE:
255         {
256             NSProvider_internal * prov =
257                     NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
258             NS_VERIFY_NOT_NULL_V(prov);
259             NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
260             NS_VERIFY_NOT_NULL_V(subTask);
261             NSConsumerCommunicationTaskProcessing(subTask);
262
263             NSRemoveProvider((NSProvider *)task->taskData);
264             NSOICFree(task);
265             break;
266         }
267         case TASK_SEND_SYNCINFO:
268         case TASK_CONSUMER_REQ_TOPIC_LIST:
269         case TASK_CONSUMER_SELECT_TOPIC_LIST:
270         {
271             NSConsumerCommunicationTaskProcessing(task);
272             break;
273         }
274         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
275         case TASK_CONSUMER_PROVIDER_DELETED:
276         {
277             NSProvider_internal * data = NULL;
278
279             if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
280             {
281                 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
282                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
283                         data, NSProviderDeletedPostClean(task, NULL, NULL));
284             }
285             else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
286             {
287                 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
288                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
289                         data, NSProviderDeletedPostClean(task, NULL, NULL));
290             }
291
292             NSProvider_internal * data2 = NSCopyProvider_internal(data);
293             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
294                         data2, NSProviderDeletedPostClean(task, data, NULL));
295
296             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
297             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
298                         conTask, NSProviderDeletedPostClean(task, data, data2));
299             NSConsumerCommunicationTaskProcessing(conTask);
300
301             NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
302             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
303                         conTask, NSProviderDeletedPostClean(task, NULL, data2));
304             NSConsumerInternalTaskProcessing(conTask2);
305
306             NSProviderDeletedPostClean(task, NULL, NULL);
307             break;
308         }
309         case TASK_RECV_SYNCINFO:
310         case TASK_CONSUMER_RECV_MESSAGE:
311         case TASK_CONSUMER_SENT_REQ_OBSERVE:
312         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
313         case TASK_MAKE_SYNCINFO:
314         case TASK_CONSUMER_REQ_TOPIC_URI:
315         case TASK_CONSUMER_RECV_TOPIC_LIST:
316         {
317             NSConsumerInternalTaskProcessing(task);
318             break;
319         }
320         case TASK_CONSUMER_PROVIDER_DISCOVERED:
321         {
322             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
323             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
324                         NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
325             getTopicTask->nextTask = NULL;
326             getTopicTask->taskData =
327                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
328             getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
329             NSConsumerCommunicationTaskProcessing(getTopicTask);
330             NSConsumerInternalTaskProcessing(task);
331             break;
332         }
333         default:
334         {
335             NS_LOG(ERROR, "Unknown type of task");
336             break;
337         }
338     }
339 }
340
341 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
342 {
343     NS_VERIFY_NOT_NULL(providerId, NULL);
344
345     return NSProviderCacheFind(providerId);
346 }