Removed NSMessage_consumer structure.
[contrib/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
45
46 void * NSConsumerMsgPushThreadFunc(void * data);
47
48 void NSConsumerTaskProcessing(NSTask * task);
49
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
51 {
52     static NSConsumerThread * handle = NULL;
53     return & handle;
54 }
55
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
57 {
58    *(NSGetMsgHandleThreadHandle()) = handle;
59 }
60
61 NSConsumerQueue ** NSGetMsgHandleQueue()
62 {
63     static NSConsumerQueue * queue = NULL;
64     return & queue;
65 }
66
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
68 {
69    *(NSGetMsgHandleQueue()) = queue;
70 }
71
72 NSResult NSConsumerMessageHandlerInit()
73 {
74     NSConsumerThread * handle = NULL;
75     NSConsumerQueue * queue = NULL;
76
77     uint8_t uuid[UUID_SIZE];
78     char uuidString[UUID_STRING_SIZE];
79     OCGenerateUuid(uuid);
80     OCConvertUuidToString(uuid, uuidString);
81     NSSetConsumerId(uuidString);
82     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
83
84     NS_LOG(DEBUG, "listener init");
85     NSResult ret = NSConsumerListenerInit();
86     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
87
88     NS_LOG(DEBUG, "system init");
89     ret = NSConsumerSystemInit();
90     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
91
92     NS_LOG(DEBUG, "queue thread init");
93     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
94     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
95     NSSetMsgHandleThreadHandle(handle);
96
97     NS_LOG(DEBUG, "create queue");
98     queue = NSCreateQueue();
99     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
100     NSSetMsgHandleQueue(queue);
101
102     return NS_OK;
103 }
104
105 NSResult NSConsumerPushEvent(NSTask * task)
106 {
107     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
108     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
109
110     return NS_OK;
111 }
112
113 void NSConsumerMessageHandlerExit()
114 {
115
116     NSConsumerListenerTermiate();
117     NSCancelAllSubscription();
118     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
119     NSDestroyQueue(*(NSGetMsgHandleQueue()));
120     NSSetMsgHandleQueue(NULL);
121
122     NSDestroyMessageCacheList();
123     NSDestroyProviderCacheList();
124 }
125
126 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
127 {
128     NSConsumerQueue * queue = *(NSGetMsgHandleQueue());;
129     NSConsumerQueueObject * obj = NULL;
130
131     NS_LOG(DEBUG, "create thread for consumer message handle");
132     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
133     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
134
135     while (true)
136     {
137         if (!queue)
138         {
139             queue = *(NSGetMsgHandleQueue());
140             usleep(2000);
141             continue;
142         }
143
144         if (!queueHandleThread->isStarted && NSIsQueueEmpty(queue))
145         {
146             NS_LOG(ERROR, "msg handler thread will be terminated");
147             break;
148         }
149
150         if (NSIsQueueEmpty(queue))
151         {
152             usleep(2000);
153             continue;
154         }
155
156         NSThreadLock(queueHandleThread);
157         NS_LOG(DEBUG, "msg handler working");
158         obj = NSPopQueue(queue);
159
160         if (obj)
161         {
162             NSConsumerTaskProcessing((NSTask *)(obj->data));
163         }
164
165         NSThreadUnlock(queueHandleThread);
166
167     }
168
169     return NULL;
170 }
171
172 void * NSConsumerMsgPushThreadFunc(void * data)
173 {
174     NSConsumerQueueObject * obj = NULL;
175     NSConsumerQueue * queue = NULL;
176
177     NS_LOG(DEBUG, "get queueThread handle");
178     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
179     NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
180
181     NS_LOG(DEBUG, "create queue object");
182     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
183     NS_VERIFY_NOT_NULL(obj, NULL);
184
185     obj->data = data;
186     obj->next = NULL;
187
188     NSThreadLock(msgHandleThread);
189
190     queue = *(NSGetMsgHandleQueue());
191     if (!queue)
192     {
193         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
194         NSOICFree(data);
195         NSOICFree(obj);
196     }
197     else
198     {
199         NSPushQueue(queue, obj);
200     }
201
202     NSThreadUnlock(msgHandleThread);
203
204     return NULL;
205 }
206
207 void NSConsumerTaskProcessing(NSTask * task)
208 {
209     switch (task->taskType)
210     {
211     case TASK_EVENT_CONNECTED:
212     case TASK_EVENT_CONNECTED_TCP:
213     case TASK_CONSUMER_REQ_DISCOVER:
214     {
215         NSConsumerDiscoveryTaskProcessing(task);
216         break;
217     }
218     case TASK_CONSUMER_REQ_SUBSCRIBE:
219     case TASK_SEND_SYNCINFO:
220     {
221         NSConsumerCommunicationTaskProcessing(task);
222         break;
223     }
224     case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
225     {
226         NSProvider_internal * data = NSCopyProvider((NSProvider_internal *)task->taskData);
227         NS_VERIFY_NOT_NULL_V(data);
228         NSTask * conTask = NSMakeTask(TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL, data);
229         NS_VERIFY_NOT_NULL_V(conTask);
230         NSConsumerCommunicationTaskProcessing(task);
231         NSConsumerInternalTaskProcessing(conTask);
232         break;
233     }
234     case TASK_RECV_SYNCINFO:
235     case TASK_CONSUMER_RECV_MESSAGE:
236     case TASK_CONSUMER_PROVIDER_DISCOVERED:
237     case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
238     case TASK_MAKE_SYNCINFO:
239     {
240         NSConsumerInternalTaskProcessing(task);
241         break;
242     }
243     default:
244         NS_LOG(ERROR, "Unknown type of task");
245         break;
246     }
247 }
248
249 NSMessage * NSConsumerFindNSMessage(const char* messageId)
250 {
251     NS_VERIFY_NOT_NULL(messageId, NULL);
252
253     return NSMessageCacheFind(messageId);
254 }
255
256 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
257 {
258     NS_VERIFY_NOT_NULL(providerId, NULL);
259
260     return NSProviderCacheFind(providerId);
261 }