1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSConsumerScheduler.h"
27 #include "oic_malloc.h"
28 #include "oic_string.h"
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
37 #include "NSConsumerQueue.h"
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
45 #include "NSConsumerMQPlugin.h"
48 void * NSConsumerMsgHandleThreadFunc(void * handle);
50 void * NSConsumerMsgPushThreadFunc(void * data);
52 void NSConsumerTaskProcessing(NSTask * task);
54 NSConsumerThread ** NSGetMsgHandleThreadHandle()
56 static NSConsumerThread * handle = NULL;
60 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
62 *(NSGetMsgHandleThreadHandle()) = handle;
65 NSConsumerQueue ** NSGetMsgHandleQueue()
67 static NSConsumerQueue * queue = NULL;
71 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
73 *(NSGetMsgHandleQueue()) = queue;
76 NSResult NSConsumerMessageHandlerInit()
78 NSConsumerThread * handle = NULL;
79 NSConsumerQueue * queue = NULL;
81 char * consumerUuid = (char *)OCGetServerInstanceIDString();
82 NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
84 NSSetConsumerId(consumerUuid);
85 NS_LOG_V(INFO_PRIVATE, "Consumer ID : %s", *NSGetConsumerId());
87 NS_LOG(DEBUG, "listener init");
88 NSResult ret = NSConsumerListenerInit();
89 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
91 NS_LOG(DEBUG, "system init");
92 ret = NSConsumerSystemInit();
93 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
95 NS_LOG(DEBUG, "create queue");
96 queue = NSCreateQueue();
97 NS_VERIFY_NOT_NULL(queue, NS_ERROR);
98 NSSetMsgHandleQueue(queue);
100 NS_LOG(DEBUG, "queue thread init");
101 handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
102 NS_VERIFY_NOT_NULL(handle, NS_ERROR);
103 NSSetMsgHandleThreadHandle(handle);
108 NSResult NSConsumerPushEvent(NSTask * task)
110 NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111 NS_VERIFY_NOT_NULL(thread, NS_ERROR);
113 NSDestroyThreadHandle(thread);
119 void NSConsumerMessageHandlerExit()
122 NSConsumerListenerTermiate();
123 NSCancelAllSubscription();
126 NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
127 NSConsumerThread * thread = *(NSGetMsgHandleThreadHandle());
129 NSThreadLock(thread);
130 NS_LOG(DEBUG, "Execute remaining task");
131 while (!NSIsQueueEmpty(queue))
133 NSConsumerQueueObject * obj = NSPopQueue(queue);
134 NS_LOG_V(DEBUG, "Execute remaining task type : %d", ((NSTask *)(obj->data))->taskType);
138 NSConsumerTaskProcessing((NSTask *)(obj->data));
142 NSThreadUnlock(thread);
144 NSDestroyQueue(queue);
146 NSSetMsgHandleQueue(NULL);
148 NSThreadLock(thread);
149 NSThreadStop(thread);
150 NSSetMsgHandleThreadHandle(NULL);
151 NSThreadUnlock(thread);
154 NSDestroyInternalCachedList();
157 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
159 NSConsumerQueueObject * obj = NULL;
161 NS_LOG(DEBUG, "create thread for consumer message handle");
162 NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
163 NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
167 queueHandleThread = *(NSGetMsgHandleThreadHandle());
168 if (NULL == queueHandleThread)
173 NSConsumerQueue * queue = *(NSGetMsgHandleQueue());
177 queue = *(NSGetMsgHandleQueue());
181 if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
183 NS_LOG(ERROR, "msg handler thread will be terminated");
187 if (NSIsQueueEmpty(queue))
193 NSThreadLock(queueHandleThread);
194 NS_LOG(DEBUG, "msg handler working");
195 queue = *(NSGetMsgHandleQueue());
196 obj = NSPopQueue(queue);
200 NSConsumerTaskProcessing((NSTask *)(obj->data));
204 NSThreadUnlock(queueHandleThread);
211 void * NSConsumerMsgPushThreadFunc(void * data)
213 NSConsumerQueueObject * obj = NULL;
214 NSConsumerQueue * queue = NULL;
216 NS_LOG(DEBUG, "get queueThread handle");
217 NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
218 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
220 NS_LOG(DEBUG, "create queue object");
221 obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
222 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
227 NSThreadLock(msgHandleThread);
229 queue = *(NSGetMsgHandleQueue());
232 NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
238 NSPushConsumerQueue(queue, obj);
241 NSThreadUnlock(msgHandleThread);
246 void NSProviderDeletedPostClean(
247 NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
249 if (task && task->taskData)
251 if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
253 NSRemoveProvider((NSProvider *) task->taskData);
255 else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
257 NSOICFree(task->taskData);
264 NSRemoveProvider_internal(prov1);
269 NSRemoveProvider_internal(prov2);
273 void NSConsumerTaskProcessing(NSTask * task)
275 NS_VERIFY_NOT_NULL_V(task);
276 switch (task->taskType)
278 case TASK_EVENT_CONNECTED:
279 case TASK_EVENT_CONNECTED_TCP:
280 case TASK_CONSUMER_REQ_DISCOVER:
282 NSConsumerDiscoveryTaskProcessing(task);
285 case TASK_CONSUMER_REQ_SUBSCRIBE:
287 NSProvider_internal * prov =
288 NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
289 NS_VERIFY_NOT_NULL_V(prov);
290 NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
291 NS_VERIFY_NOT_NULL_V(subTask);
292 NSConsumerCommunicationTaskProcessing(subTask);
294 NSRemoveProvider((NSProvider *)task->taskData);
298 case TASK_SEND_SYNCINFO:
299 case TASK_CONSUMER_REQ_TOPIC_LIST:
300 case TASK_CONSUMER_SELECT_TOPIC_LIST:
302 NSConsumerCommunicationTaskProcessing(task);
305 case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
306 case TASK_CONSUMER_PROVIDER_DELETED:
308 NSProvider_internal * data = NULL;
310 if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
312 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
313 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
314 data, NSProviderDeletedPostClean(task, NULL, NULL));
316 else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
318 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
319 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
320 data, NSProviderDeletedPostClean(task, NULL, NULL));
323 NSProvider_internal * data2 = NSCopyProvider_internal(data);
324 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
325 data2, NSProviderDeletedPostClean(task, data, NULL));
327 NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
328 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
329 conTask, NSProviderDeletedPostClean(task, data, data2));
330 NSConsumerCommunicationTaskProcessing(conTask);
332 NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
333 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
334 conTask, NSProviderDeletedPostClean(task, NULL, data2));
335 NSConsumerInternalTaskProcessing(conTask2);
337 NSProviderDeletedPostClean(task, NULL, NULL);
340 case TASK_RECV_SYNCINFO:
341 case TASK_CONSUMER_RECV_MESSAGE:
342 case TASK_CONSUMER_SENT_REQ_OBSERVE:
343 case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
344 case TASK_MAKE_SYNCINFO:
345 case TASK_CONSUMER_REQ_TOPIC_URI:
346 case TASK_CONSUMER_RECV_TOPIC_LIST:
348 NSConsumerInternalTaskProcessing(task);
351 case TASK_CONSUMER_PROVIDER_DISCOVERED:
353 NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
354 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
356 NSRemoveProvider_internal((void *) task->taskData);
359 getTopicTask->nextTask = NULL;
360 getTopicTask->taskData =
361 (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
362 getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
363 NSConsumerCommunicationTaskProcessing(getTopicTask);
364 NSConsumerInternalTaskProcessing(task);
368 case TASK_MQ_REQ_SUBSCRIBE:
370 NSConsumerMQTaskProcessing(task);
376 NS_LOG(ERROR, "Unknown type of task");
382 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
384 NS_VERIFY_NOT_NULL(providerId, NULL);
386 return NSProviderCacheFind(providerId);