1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSConsumerScheduler.h"
27 #include "oic_malloc.h"
28 #include "oic_string.h"
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
37 #include "NSConsumerQueue.h"
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
46 void * NSConsumerMsgPushThreadFunc(void * data);
48 void NSConsumerTaskProcessing(NSTask * task);
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
52 static NSConsumerThread * handle = NULL;
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
58 *(NSGetMsgHandleThreadHandle()) = handle;
61 NSConsumerQueue ** NSGetMsgHandleQueue()
63 static NSConsumerQueue * queue = NULL;
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
69 *(NSGetMsgHandleQueue()) = queue;
72 NSResult NSConsumerMessageHandlerInit()
74 NSConsumerThread * handle = NULL;
75 NSConsumerQueue * queue = NULL;
77 char * consumerUuid = (char *)OCGetServerInstanceIDString();
78 NS_VERIFY_NOT_NULL(consumerUuid, NS_ERROR);
80 NSSetConsumerId(consumerUuid);
81 NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
83 NS_LOG(DEBUG, "listener init");
84 NSResult ret = NSConsumerListenerInit();
85 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
87 NS_LOG(DEBUG, "system init");
88 ret = NSConsumerSystemInit();
89 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
91 NS_LOG(DEBUG, "queue thread init");
92 handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
93 NS_VERIFY_NOT_NULL(handle, NS_ERROR);
94 NSSetMsgHandleThreadHandle(handle);
96 NS_LOG(DEBUG, "create queue");
97 queue = NSCreateQueue();
98 NS_VERIFY_NOT_NULL(queue, NS_ERROR);
99 NSSetMsgHandleQueue(queue);
104 NSResult NSConsumerPushEvent(NSTask * task)
106 NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
107 NS_VERIFY_NOT_NULL(thread, NS_ERROR);
109 NSDestroyThreadHandle(thread);
114 void NSConsumerMessageHandlerExit()
117 NSConsumerListenerTermiate();
118 NSCancelAllSubscription();
120 NSThreadStop(*(NSGetMsgHandleThreadHandle()));
121 NSSetMsgHandleThreadHandle(NULL);
123 NSDestroyQueue(*(NSGetMsgHandleQueue()));
124 NSSetMsgHandleQueue(NULL);
126 NSDestroyInternalCachedList();
129 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
131 NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
132 NSConsumerQueueObject * obj = NULL;
134 NS_LOG(DEBUG, "create thread for consumer message handle");
135 NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
136 NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
142 queue = *(NSGetMsgHandleQueue());
147 if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
149 NS_LOG(ERROR, "msg handler thread will be terminated");
153 if (NSIsQueueEmpty(queue))
159 NSThreadLock(queueHandleThread);
160 NS_LOG(DEBUG, "msg handler working");
161 obj = NSPopQueue(queue);
165 NSConsumerTaskProcessing((NSTask *)(obj->data));
168 NSThreadUnlock(queueHandleThread);
175 void * NSConsumerMsgPushThreadFunc(void * data)
179 NSConsumerQueueObject * obj = NULL;
180 NSConsumerQueue * queue = NULL;
182 NS_LOG(DEBUG, "get queueThread handle");
183 NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
184 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(msgHandleThread, NULL, NSOICFree(data));
186 NS_LOG(DEBUG, "create queue object");
187 obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
188 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING(obj, NULL, NSOICFree(data));
193 NSThreadLock(msgHandleThread);
195 queue = *(NSGetMsgHandleQueue());
198 NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
204 NSPushQueue(queue, obj);
207 NSThreadUnlock(msgHandleThread);
212 void NSProviderDeletedPostClean(
213 NSTask * task, NSProvider_internal * prov1, NSProvider_internal * prov2)
215 if (task && task->taskData)
217 if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
219 NSRemoveProvider((NSProvider *) task->taskData);
221 else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
223 NSOICFree(task->taskData);
230 NSRemoveProvider_internal(prov1);
235 NSRemoveProvider_internal(prov2);
239 void NSConsumerTaskProcessing(NSTask * task)
241 switch (task->taskType)
243 case TASK_EVENT_CONNECTED:
244 case TASK_EVENT_CONNECTED_TCP:
245 case TASK_CONSUMER_REQ_DISCOVER:
247 NSConsumerDiscoveryTaskProcessing(task);
250 case TASK_CONSUMER_REQ_SUBSCRIBE:
252 NSProvider_internal * prov =
253 NSConsumerFindNSProvider(((NSProvider *)task->taskData)->providerId);
254 NS_VERIFY_NOT_NULL_V(prov);
255 NSTask * subTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE, prov);
256 NS_VERIFY_NOT_NULL_V(subTask);
257 NSConsumerCommunicationTaskProcessing(subTask);
259 NSRemoveProvider((NSProvider *)task->taskData);
263 case TASK_SEND_SYNCINFO:
264 case TASK_CONSUMER_REQ_TOPIC_LIST:
265 case TASK_CONSUMER_SELECT_TOPIC_LIST:
267 NSConsumerCommunicationTaskProcessing(task);
270 case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
271 case TASK_CONSUMER_PROVIDER_DELETED:
273 NSProvider_internal * data = NULL;
275 if (task->taskType == TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL)
277 data = NSConsumerFindNSProvider(((NSProvider *) task->taskData)->providerId);
278 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
279 data, NSProviderDeletedPostClean(task, NULL, NULL));
281 else if (task->taskType == TASK_CONSUMER_PROVIDER_DELETED)
283 data = NSFindProviderFromAddr((OCDevAddr *) task->taskData);
284 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
285 data, NSProviderDeletedPostClean(task, NULL, NULL));
288 NSProvider_internal * data2 = NSCopyProvider_internal(data);
289 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
290 data2, NSProviderDeletedPostClean(task, data, NULL));
292 NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
293 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
294 conTask, NSProviderDeletedPostClean(task, data, data2));
295 NSConsumerCommunicationTaskProcessing(conTask);
297 NSTask * conTask2 = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data2);
298 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(
299 conTask, NSProviderDeletedPostClean(task, NULL, data2));
300 NSConsumerInternalTaskProcessing(conTask2);
302 NSProviderDeletedPostClean(task, NULL, NULL);
305 case TASK_RECV_SYNCINFO:
306 case TASK_CONSUMER_RECV_MESSAGE:
307 case TASK_CONSUMER_SENT_REQ_OBSERVE:
308 case TASK_CONSUMER_RECV_PROVIDER_CHANGED:
309 case TASK_MAKE_SYNCINFO:
310 case TASK_CONSUMER_REQ_TOPIC_URI:
311 case TASK_CONSUMER_RECV_TOPIC_LIST:
313 NSConsumerInternalTaskProcessing(task);
316 case TASK_CONSUMER_PROVIDER_DISCOVERED:
318 NSTask * getTopicTask = (NSTask *)OICMalloc(sizeof(NSTask));
319 NS_VERIFY_NOT_NULL_WITH_POST_CLEANING_V(getTopicTask,
320 NSRemoveProvider_internal((NSProvider_internal *) task->taskData));
321 getTopicTask->nextTask = NULL;
322 getTopicTask->taskData =
323 (void *) NSCopyProvider_internal((NSProvider_internal *) task->taskData);
324 getTopicTask->taskType = TASK_CONSUMER_REQ_TOPIC_LIST;
325 NSConsumerCommunicationTaskProcessing(getTopicTask);
326 NSConsumerInternalTaskProcessing(task);
331 NS_LOG(ERROR, "Unknown type of task");
337 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
339 NS_VERIFY_NOT_NULL(providerId, NULL);
341 return NSProviderCacheFind(providerId);