Update snapshot(2017-12-14)
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 #ifdef WITH_MQ
45 #include "NSConsumerMQPlugin.h"
46 #endif
47
48 void * NSConsumerMsgHandleThreadFunc(void * handle);
49
50 void * NSConsumerMsgPushThreadFunc(void * data);
51
52 void NSConsumerTaskProcessing(NSTask * task);
53
54 NSConsumerThread ** NSGetMsgHandleThreadHandle()
55 {
56     static NSConsumerThread * handle = NULL;
57     return & handle;
58 }
59
60 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
61 {
62    *(NSGetMsgHandleThreadHandle()) = handle;
63 }
64
65 NSConsumerQueue ** NSGetMsgHandleQueue()
66 {
67     static NSConsumerQueue * queue = NULL;
68     return & queue;
69 }
70
71 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
72 {
73    *(NSGetMsgHandleQueue()) = queue;
74 }
75
76 NSResult NSConsumerMessageHandlerInit()
77 {
78     NSConsumerThread * handle = NULL;
79     NSConsumerQueue * queue = NULL;
80
81     char * consumerUuid = (char *)OCGetServerInstanceIDString();
82     NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
83
84     NSSetConsumerId(consumerUuid);
85     NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
86
87     NS_LOG(DEBUG, "listener init");
88     NSResult ret = NSConsumerListenerInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "system init");
92     ret = NSConsumerSystemInit();
93     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
94
95     NS_LOG(DEBUG, "create queue");
96     queue = NSCreateQueue();
97     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
98     NSSetMsgHandleQueue(queue);
99
100     NS_LOG(DEBUG, "queue thread init");
101     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
102     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
103     NSSetMsgHandleThreadHandle(handle);
104
105     return NS_OK;
106 }
107
108 NSResult NSConsumerPushEvent(NSTask * task)
109 {
110     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
112
113     NSDestroyThreadHandle(thread);
114     NSOICFree(thread);
115
116     return NS_OK;
117 }
118
119 void NSConsumerMessageHandlerExit()
120 {
121
122     NSConsumerListenerTermiate();
123     NSCancelAllSubscription();
124
125     NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
126     NSThreadStop(thread);
127     NSSetMsgHandleThreadHandle(NULL);
128
129     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
130     NSDestroyQueue(queue);
131     NSSetMsgHandleQueue(NULL);
132
133     NSDestroyInternalCachedList();
134 }
135
136 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
137 {
138     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
139     NSConsumerQueueObject * obj = NULL;
140
141     NS_LOG(DEBUG, "create thread for consumer message handle");
142     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
143     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
144
145     while (true)
146     {
147         if (!queue)
148         {
149             queue = *(NSGetMsgHandleQueue());
150             usleep(2000);
151             continue;
152         }
153
154         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
155         {
156             NS_LOG(ERROR, "msg handler thread will be terminated");
157             break;
158         }
159
160         if (NSIsQueueEmpty(queue))
161         {
162             usleep(2000);
163             continue;
164         }
165
166         NSThreadLock(queueHandleThread);
167         NS_LOG(DEBUG, "msg handler working");
168         obj = NSPopQueue(queue);
169
170         if (obj)
171         {
172             NSConsumerTaskProcessing((NSTask *)(obj->data));
173             NSOICFree(obj);
174         }
175
176         NSThreadUnlock(queueHandleThread);
177
178     }
179
180     return NULL;
181 }
182
183 void * NSConsumerMsgPushThreadFunc(void * data)
184 {
185     NSConsumerQueueObject * obj = NULL;
186     NSConsumerQueue * queue = NULL;
187
188     NS_LOG(DEBUG, "get queueThread handle");
189     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
190     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
191
192     NS_LOG(DEBUG, "create queue object");
193     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
194     NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
195
196     obj->data = data;
197     obj->next = NULL;
198
199     NSThreadLock(msgHandleThread);
200
201     queue = *(NSGetMsgHandleQueue());
202     if (!queue)
203     {
204         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
205         NSOICFree(data);
206         NSOICFree(obj);
207     }
208     else
209     {
210         NSPushConsumerQueue(queue, obj);
211     }
212
213     NSThreadUnlock(msgHandleThread);
214
215     return NULL;
216 }
217
218 void NSProviderDeletedPostClean(
219         NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
220 {
221     if (task && task->taskData)
222     {
223         if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
224         {
225             NSRemoveProvider((NSProvider *) task->taskData);
226         }
227         else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
228         {
229             NSOICFree(task->taskData);
230         }
231         NSOICFree(task);
232     }
233
234     if (prov1)
235     {
236         NSRemoveProvider_internal(prov1);
237     }
238
239     if (prov2)
240     {
241         NSRemoveProvider_internal(prov2);
242     }
243 }
244
245 void NSConsumerTaskProcessing(NSTask * task)
246 {
247     switch (task->taskType)
248     {
249         case TASK_EVENT_CONNECTED:
250         case TASK_EVENT_CONNECTED_TCP:
251         case TASK_CONSUMER_REQ_DISCOVER:
252         {
253             NSConsumerDiscoveryTaskProcessing(task);
254             break;
255         }
256         case TASK_CONSUMER_REQ_SUBSCRIBE:
257         {
258             NSProvider_internal * prov =
259                     NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
260             NS_VERIFY_NOT_NULL_V(prov);
261             NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
262             NS_VERIFY_NOT_NULL_V(subTask);
263             NSConsumerCommunicationTaskProcessing(subTask);
264
265             NSRemoveProvider((NSProvider *)task->taskData);
266             NSOICFree(task);
267             break;
268         }
269         case TASK_SEND_SYNCINFO:
270         case TASK_CONSUMER_REQ_TOPIC_LIST:
271         case TASK_CONSUMER_SELECT_TOPIC_LIST:
272         {
273             NSConsumerCommunicationTaskProcessing(task);
274             break;
275         }
276         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
277         case TASK_CONSUMER_PROVIDER_DELETED:
278         {
279             NSProvider_internal * data = NULL;
280
281             if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
282             {
283                 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
284                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
285                         data, NSProviderDeletedPostClean(task, NULL, NULL));
286             }
287             else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
288             {
289                 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
290                 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
291                         data, NSProviderDeletedPostClean(task, NULL, NULL));
292             }
293
294             NSProvider_internal * data2 = NSCopyProvider_internal(data);
295             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
296                         data2, NSProviderDeletedPostClean(task, data, NULL));
297
298             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
299             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
300                         conTask, NSProviderDeletedPostClean(task, data, data2));
301             NSConsumerCommunicationTaskProcessing(conTask);
302
303             NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
304             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
305                         conTask, NSProviderDeletedPostClean(task, NULL, data2));
306             NSConsumerInternalTaskProcessing(conTask2);
307
308             NSProviderDeletedPostClean(task, NULL, NULL);
309             break;
310         }
311         case TASK_RECV_SYNCINFO:
312         case TASK_CONSUMER_RECV_MESSAGE:
313         case TASK_CONSUMER_SENT_REQ_OBSERVE:
314         case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
315         case TASK_MAKE_SYNCINFO:
316         case TASK_CONSUMER_REQ_TOPIC_URI:
317         case TASK_CONSUMER_RECV_TOPIC_LIST:
318         {
319             NSConsumerInternalTaskProcessing(task);
320             break;
321         }
322         case TASK_CONSUMER_PROVIDER_DISCOVERED:
323         {
324             NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
325             NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
326                         NSRemoveProvider_internal((void *) task->taskData));
327             getTopicTask->nextTask = NULL;
328             getTopicTask->taskData =
329                     (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
330             getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
331             NSConsumerCommunicationTaskProcessing(getTopicTask);
332             NSConsumerInternalTaskProcessing(task);
333             break;
334         }
335 #ifdef WITH_MQ
336         case TASK_MQ_REQ_SUBSCRIBE:
337         {
338             NSConsumerMQTaskProcessing(task);
339             break;
340         }
341 #endif
342         default:
343         {
344             NS_LOG(ERROR, "Unknown type of task");
345             break;
346         }
347     }
348 }
349
350 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
351 {
352     NS_VERIFY_NOT_NULL(providerId, NULL);
353
354     return NSProviderCacheFind(providerId);
355 }