Modify Consumer to support Topic functionality
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
45
46 void * NSConsumerMsgPushThreadFunc(void * data);
47
48 void NSConsumerTaskProcessing(NSTask * task);
49
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
51 {
52     static NSConsumerThread * handle = NULL;
53     return & handle;
54 }
55
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
57 {
58    *(NSGetMsgHandleThreadHandle()) = handle;
59 }
60
61 NSConsumerQueue ** NSGetMsgHandleQueue()
62 {
63     static NSConsumerQueue * queue = NULL;
64     return & queue;
65 }
66
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
68 {
69    *(NSGetMsgHandleQueue()) = queue;
70 }
71
72 NSResult NSConsumerMessageHandlerInit()
73 {
74     NSConsumerThread * handle = NULL;
75     NSConsumerQueue * queue = NULL;
76
77     uint8_t uuid[UUID_SIZE] = {0,};
78     char uuidString[UUID_STRING_SIZE] = {0,};
79     OCRandomUuidResult randomRet = OCGenerateUuid(uuid);
80     NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
81     randomRet = OCConvertUuidToString(uuid, uuidString);
82     NS_VERIFY_NOT_NULL(randomRet == RAND_UUID_OK ? (void *) 1 : NULL, NS_ERROR);
83
84     NSSetConsumerId(uuidString);
85     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
86
87     NS_LOG(DEBUG, "listener init");
88     NSResult ret = NSConsumerListenerInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "system init");
92     ret = NSConsumerSystemInit();
93     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
94
95     NS_LOG(DEBUG, "queue thread init");
96     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
97     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
98     NSSetMsgHandleThreadHandle(handle);
99
100     NS_LOG(DEBUG, "create queue");
101     queue = NSCreateQueue();
102     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
103     NSSetMsgHandleQueue(queue);
104
105     return NS_OK;
106 }
107
108 NSResult NSConsumerPushEvent(NSTask * task)
109 {
110     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
111     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
112
113     return NS_OK;
114 }
115
116 void NSConsumerMessageHandlerExit()
117 {
118
119     NSConsumerListenerTermiate();
120     NSCancelAllSubscription();
121     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
122     NSDestroyQueue(*(NSGetMsgHandleQueue()));
123     NSSetMsgHandleQueue(NULL);
124
125     NSDestroyMessageCacheList();
126     NSDestroyProviderCacheList();
127 }
128
129 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
130 {
131     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
132     NSConsumerQueueObject * obj = NULL;
133
134     NS_LOG(DEBUG, "create thread for consumer message handle");
135     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
136     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
137
138     while (true)
139     {
140         if (!queue)
141         {
142             queue = *(NSGetMsgHandleQueue());
143             usleep(2000);
144             continue;
145         }
146
147         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
148         {
149             NS_LOG(ERROR, "msg handler thread will be terminated");
150             break;
151         }
152
153         if (NSIsQueueEmpty(queue))
154         {
155             usleep(2000);
156             continue;
157         }
158
159         NSThreadLock(queueHandleThread);
160         NS_LOG(DEBUG, "msg handler working");
161         obj = NSPopQueue(queue);
162
163         if (obj)
164         {
165             NSConsumerTaskProcessing((NSTask *)(obj->data));
166         }
167
168         NSThreadUnlock(queueHandleThread);
169
170     }
171
172     return NULL;
173 }
174
175 void * NSConsumerMsgPushThreadFunc(void * data)
176 {
177     NSConsumerQueueObject * obj = NULL;
178     NSConsumerQueue * queue = NULL;
179
180     NS_LOG(DEBUG, "get queueThread handle");
181     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
182     NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
183
184     NS_LOG(DEBUG, "create queue object");
185     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
186     NS_VERIFY_NOT_NULL(obj, NULL);
187
188     obj->data = data;
189     obj->next = NULL;
190
191     NSThreadLock(msgHandleThread);
192
193     queue = *(NSGetMsgHandleQueue());
194     if (!queue)
195     {
196         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
197         NSOICFree(data);
198         NSOICFree(obj);
199     }
200     else
201     {
202         NSPushQueue(queue, obj);
203     }
204
205     NSThreadUnlock(msgHandleThread);
206
207     return NULL;
208 }
209
210 void NSConsumerTaskProcessing(NSTask * task)
211 {
212     switch (task->taskType)
213     {
214         case TASK_EVENT_CONNECTED:
215         case TASK_EVENT_CONNECTED_TCP:
216         case TASK_CONSUMER_REQ_DISCOVER:
217         {
218             NSConsumerDiscoveryTaskProcessing(task);
219             break;
220         }
221         case TASK_CONSUMER_REQ_SUBSCRIBE:
222         case TASK_SEND_SYNCINFO:
223         case TASK_CONSUMER_REQ_TOPIC_LIST:
224         case TASK_CONSUMER_GET_TOPIC_LIST:
225         case TASK_CONSUMER_SELECT_TOPIC_LIST:
226         {
227             NSConsumerCommunicationTaskProcessing(task);
228             break;
229         }
230         case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
231         {
232             NSProvider_internal * data = NSCopyProvider((NSProvider_internal *)task->taskData);
233             NS_VERIFY_NOT_NULL_V(data);
234             NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
235             NS_VERIFY_NOT_NULL_V(conTask);
236             NSConsumerCommunicationTaskProcessing(task);
237             NSConsumerInternalTaskProcessing(conTask);
238             break;
239         }
240         case TASK_RECV_SYNCINFO:
241         case TASK_CONSUMER_RECV_MESSAGE:
242         case TASK_CONSUMER_PROVIDER_DISCOVERED:
243         case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
244         case TASK_MAKE_SYNCINFO:
245         case TASK_CONSUMER_REQ_TOPIC_URI:
246         case TASK_CONSUMER_RECV_TOPIC_LIST:
247         {
248             NSConsumerInternalTaskProcessing(task);
249             break;
250         }
251         default:
252         {
253             NS_LOG(ERROR, "Unknown type of task");
254             break;
255         }
256     }
257 }
258
259 NSMessage * NSConsumerFindNSMessage(const char* messageId)
260 {
261     NS_VERIFY_NOT_NULL(messageId, NULL);
262
263     return NSMessageCacheFind(messageId);
264 }
265
266 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
267 {
268     NS_VERIFY_NOT_NULL(providerId, NULL);
269
270     return NSProviderCacheFind(providerId);
271 }