Fixed bug for duplicated request to subscribe and invalid storage logic.
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "NSConsumerScheduler.h"
22
23 #include <stdlib.h>
24 #include <stdbool.h>
25 #include <unistd.h>
26
27 #include "oic_malloc.h"
28 #include "oic_string.h"
29 #include "ocrandom.h"
30
31 #include "NSStructs.h"
32 #include "NSConstants.h"
33 #include "NSConsumerCommon.h"
34 #include "NSConsumerCommunication.h"
35
36 #include "NSThread.h"
37 #include "NSConsumerQueue.h"
38
39 #include "NSConsumerDiscovery.h"
40 #include "NSConsumerInternalTaskController.h"
41 #include "NSConsumerNetworkEventListener.h"
42 #include "NSConsumerSystem.h"
43
44 void * NSConsumerMsgHandleThreadFunc(void * handle);
45
46 void * NSConsumerMsgPushThreadFunc(void * data);
47
48 void NSConsumerTaskProcessing(NSTask * task);
49
50 NSConsumerThread ** NSGetMsgHandleThreadHandle()
51 {
52     static NSConsumerThread * handle = NULL;
53     return & handle;
54 }
55
56 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
57 {
58    *(NSGetMsgHandleThreadHandle()) = handle;
59 }
60
61 NSConsumerQueue ** NSGetMsgHandleQueue()
62 {
63     static NSConsumerQueue * queue = NULL;
64     return & queue;
65 }
66
67 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
68 {
69    *(NSGetMsgHandleQueue()) = queue;
70 }
71
72 NSResult NSConsumerMessageHandlerInit()
73 {
74     NSConsumerThread * handle = NULL;
75     NSConsumerQueue * queue = NULL;
76
77     uint8_t uuid[UUID_SIZE];
78     char uuidString[UUID_STRING_SIZE];
79     OCGenerateUuid(uuid);
80     OCConvertUuidToString(uuid, uuidString);
81     NSSetConsumerId(uuidString);
82     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
83
84     NS_LOG(DEBUG, "listener init");
85     NSResult ret = NSConsumerListenerInit();
86     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
87
88     NS_LOG(DEBUG, "system init");
89     ret = NSConsumerSystemInit();
90     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
91
92     NS_LOG(DEBUG, "queue thread init");
93     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
94     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
95     NSSetMsgHandleThreadHandle(handle);
96
97     NS_LOG(DEBUG, "create queue");
98     queue = NSCreateQueue();
99     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
100     NSSetMsgHandleQueue(queue);
101
102     return NS_OK;
103 }
104
105 NSResult NSConsumerPushEvent(NSTask * task)
106 {
107     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
108     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
109
110     return NS_OK;
111 }
112
113 void NSConsumerMessageHandlerExit()
114 {
115     NSDestroyMessageCacheList();
116     NSDestroyProviderCacheList();
117     NSConsumerListenerTermiate();
118     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
119     NSDestroyQueue(*(NSGetMsgHandleQueue()));
120     NSSetMsgHandleQueue(NULL);
121 }
122
123 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
124 {
125     NSConsumerQueue * queue = NULL;
126     NSConsumerQueueObject * obj = NULL;
127
128     NS_LOG(DEBUG, "create thread for consumer message handle");
129     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
130     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
131
132     while (true)
133     {
134         if (!queueHandleThread->isStarted)
135         {
136             NS_LOG(ERROR, "msg handler thread will be terminated");
137             break;
138         }
139
140         queue = *(NSGetMsgHandleQueue());
141         if (!queue)
142         {
143             continue;
144         }
145
146         if (NSIsQueueEmpty(queue))
147         {
148             usleep(2000);
149             continue;
150         }
151
152         NSThreadLock(queueHandleThread);
153         NS_LOG(DEBUG, "msg handler working");
154         obj = NSPopQueue(queue);
155
156         if (obj)
157         {
158             NSConsumerTaskProcessing((NSTask *)(obj->data));
159         }
160
161         NSThreadUnlock(queueHandleThread);
162
163     }
164
165     return NULL;
166 }
167
168 void * NSConsumerMsgPushThreadFunc(void * data)
169 {
170     NSConsumerQueueObject * obj = NULL;
171     NSConsumerQueue * queue = NULL;
172
173     NS_LOG(DEBUG, "get queueThread handle");
174     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
175     NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
176
177     NS_LOG(DEBUG, "create queue object");
178     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
179     NS_VERIFY_NOT_NULL(obj, NULL);
180
181     obj->data = data;
182     obj->next = NULL;
183
184     NSThreadLock(msgHandleThread);
185
186     queue = *(NSGetMsgHandleQueue());
187     if (!queue)
188     {
189         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
190         NSOICFree(data);
191         NSOICFree(obj);
192     }
193     else
194     {
195         NSPushQueue(queue, obj);
196     }
197
198     NSThreadUnlock(msgHandleThread);
199
200     return NULL;
201 }
202
203 void NSConsumerTaskProcessing(NSTask * task)
204 {
205     switch (task->taskType)
206     {
207     case TASK_EVENT_CONNECTED:
208     case TASK_EVENT_CONNECTED_TCP:
209     case TASK_CONSUMER_REQ_DISCOVER:
210     {
211         NSConsumerDiscoveryTaskProcessing(task);
212         break;
213     }
214     case TASK_CONSUMER_REQ_SUBSCRIBE:
215     case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
216     case TASK_SEND_SYNCINFO:
217     {
218         NSConsumerCommunicationTaskProcessing(task);
219         break;
220     }
221     case TASK_RECV_SYNCINFO:
222     case TASK_CONSUMER_RECV_MESSAGE:
223     case TASK_CONSUMER_PROVIDER_DISCOVERED:
224     case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
225     case TASK_MAKE_SYNCINFO:
226     {
227         NSConsumerInternalTaskProcessing(task);
228         break;
229     }
230     default:
231         NS_LOG(ERROR, "Unknown type of task");
232         break;
233     }
234 }
235
236 NSMessage_consumer * NSConsumerFindNSMessage(const char* messageId)
237 {
238     NS_VERIFY_NOT_NULL(messageId, NULL);
239
240     return NSMessageCacheFind(messageId);
241 }
242
243 NSProvider_internal * NSConsumerFindNSProvider(const char * providerId)
244 {
245     NS_VERIFY_NOT_NULL(providerId, NULL);
246
247     return NSProviderCacheFind(providerId);
248 }