1 //******************************************************************
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "NSConsumerScheduler.h"
27 #include "oic_malloc.h"
28 #include "oic_string.h"
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
37 #include "NSConsumerQueue.h"
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
46 void * NSConsumerMsgPushThreadFunc(void * data);
48 void NSConsumerTaskProcessing(NSTask * task);
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
52 static NSConsumerThread * handle = NULL;
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
58 *(NSGetMsgHandleThreadHandle()) = handle;
61 NSConsumerQueue ** NSGetMsgHandleQueue()
63 static NSConsumerQueue * queue = NULL;
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
69 *(NSGetMsgHandleQueue()) = queue;
72 NSResult NSConsumerMessageHandlerInit()
74 NSConsumerThread * handle = NULL;
75 NSConsumerQueue * queue = NULL;
77 uint8_t uuid[UUID_SIZE];
78 char uuidString[UUID_STRING_SIZE];
80 OCConvertUuidToString(uuid, uuidString);
81 NSSetConsumerId(uuidString);
82 NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
84 NS_LOG(DEBUG, "listener init");
85 NSResult ret = NSConsumerListenerInit();
86 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
88 NS_LOG(DEBUG, "system init");
89 ret = NSConsumerSystemInit();
90 NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
92 NS_LOG(DEBUG, "queue thread init");
93 handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
94 NS_VERIFY_NOT_NULL(handle, NS_ERROR);
95 NSSetMsgHandleThreadHandle(handle);
97 NS_LOG(DEBUG, "create queue");
98 queue = NSCreateQueue();
99 NS_VERIFY_NOT_NULL(queue, NS_ERROR);
100 NSSetMsgHandleQueue(queue);
105 NSResult NSConsumerPushEvent(NSTask * task)
107 NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
108 NS_VERIFY_NOT_NULL(thread, NS_ERROR);
113 void NSConsumerMessageHandlerExit()
116 NSConsumerListenerTermiate();
117 NSCancelAllSubscription();
118 NSThreadStop(*(NSGetMsgHandleThreadHandle()));
119 NSDestroyQueue(*(NSGetMsgHandleQueue()));
120 NSSetMsgHandleQueue(NULL);
122 NSDestroyMessageCacheList();
123 NSDestroyProviderCacheList();
126 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
128 NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
129 NSConsumerQueueObject * obj = NULL;
131 NS_LOG(DEBUG, "create thread for consumer message handle");
132 NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
133 NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
139 queue = *(NSGetMsgHandleQueue());
144 if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
146 NS_LOG(ERROR, "msg handler thread will be terminated");
150 if (NSIsQueueEmpty(queue))
156 NSThreadLock(queueHandleThread);
157 NS_LOG(DEBUG, "msg handler working");
158 obj = NSPopQueue(queue);
162 NSConsumerTaskProcessing((NSTask *)(obj->data));
165 NSThreadUnlock(queueHandleThread);
172 void * NSConsumerMsgPushThreadFunc(void * data)
174 NSConsumerQueueObject * obj = NULL;
175 NSConsumerQueue * queue = NULL;
177 NS_LOG(DEBUG, "get queueThread handle");
178 NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
179 NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
181 NS_LOG(DEBUG, "create queue object");
182 obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
183 NS_VERIFY_NOT_NULL(obj, NULL);
188 NSThreadLock(msgHandleThread);
190 queue = *(NSGetMsgHandleQueue());
193 NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
199 NSPushQueue(queue, obj);
202 NSThreadUnlock(msgHandleThread);
207 void NSConsumerTaskProcessing(NSTask * task)
209 switch (task->taskType)
211 case TASK_EVENT_CONNECTED:
212 case TASK_EVENT_CONNECTED_TCP:
213 case TASK_CONSUMER_REQ_DISCOVER:
215 NSConsumerDiscoveryTaskProcessing(task);
218 case TASK_CONSUMER_REQ_SUBSCRIBE:
219 case TASK_SEND_SYNCINFO:
221 NSConsumerCommunicationTaskProcessing(task);
224 case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
226 NSProvider_internal * data = NSCopyProvider((NSProvider_internal *)task->taskData);
227 NS_VERIFY_NOT_NULL_V(data);
228 NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
229 NS_VERIFY_NOT_NULL_V(conTask);
230 NSConsumerCommunicationTaskProcessing(task);
231 NSConsumerInternalTaskProcessing(conTask);
234 case TASK_RECV_SYNCINFO:
235 case TASK_CONSUMER_RECV_MESSAGE:
236 case TASK_CONSUMER_PROVIDER_DISCOVERED:
237 case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
238 case TASK_MAKE_SYNCINFO:
240 NSConsumerInternalTaskProcessing(task);
244 NS_LOG(ERROR, "Unknown type of task");
249 NSMessage_consumer * NSConsumerFindNSMessage(const char* messageId)
251 NS_VERIFY_NOT_NULL(messageId, NULL);
253 return NSMessageCacheFind(messageId);
256 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
258 NS_VERIFY_NOT_NULL(providerId, NULL);
260 return NSProviderCacheFind(providerId);