Merge branch 'master' into notification-service
[platform/upstream/iotivity.git] / service / notification / src / consumer / NSConsumerQueueScheduler.c
1 //******************************************************************
2 //
3 // Copyright 2016 Samsung Electronics All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include <stdlib.h>
22 #include <stdbool.h>
23 #include <unistd.h>
24
25 #include "oic_malloc.h"
26 #include "oic_string.h"
27 #include "ocrandom.h"
28
29 #include "NSStructs.h"
30 #include "NSConstants.h"
31 #include "NSConsumerCommon.h"
32 #include "NSConsumerCommunication.h"
33
34 #include "NSThread.h"
35 #include "NSConsumerQueue.h"
36
37 #include "NSConsumerDiscovery.h"
38 #include "NSConsumerInternalTaskController.h"
39 #include "NSConsumerNetworkEventListener.h"
40 #include "NSConsumerQueueScheduler.h"
41 #include "NSConsumerSystem.h"
42
43 void * NSConsumerMsgHandleThreadFunc(void * handle);
44
45 void * NSConsumerMsgPushThreadFunc(void * data);
46
47 void NSConsumerTaskProcessing(NSTask * task);
48
49 NSConsumerThread ** NSGetMsgHandleThreadHandle()
50 {
51     static NSConsumerThread * handle = NULL;
52     return & handle;
53 }
54
55 void NSSetMsgHandleThreadHandle(NSConsumerThread * handle)
56 {
57    *(NSGetMsgHandleThreadHandle()) = handle;
58 }
59
60 NSConsumerQueue ** NSGetMsgHandleQueue()
61 {
62     static NSConsumerQueue * queue = NULL;
63     return & queue;
64 }
65
66 void NSSetMsgHandleQueue(NSConsumerQueue * queue)
67 {
68    *(NSGetMsgHandleQueue()) = queue;
69 }
70
71 NSResult NSConsumerMessageHandlerInit()
72 {
73     NSConsumerThread * handle = NULL;
74     NSConsumerQueue * queue = NULL;
75
76     uint8_t uuid[UUID_SIZE];
77     char uuidString[UUID_STRING_SIZE];
78     OCGenerateUuid(uuid);
79     OCConvertUuidToString(uuid, uuidString);
80     NSSetConsumerId(uuidString);
81     NS_LOG_V(DEBUG, "Consumer ID : %s", *NSGetConsumerId());
82
83     NS_LOG(DEBUG, "listener init");
84     NSResult ret = NSConsumerListenerInit();
85     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
86
87     NS_LOG(DEBUG, "system init");
88     ret = NSConsumerSystemInit();
89     NS_VERIFY_NOT_NULL(ret == NS_OK ? (void *) 1 : NULL, NS_ERROR);
90
91     NS_LOG(DEBUG, "queue thread init");
92     handle = NSThreadInit(NSConsumerMsgHandleThreadFunc, NULL);
93     NS_VERIFY_NOT_NULL(handle, NS_ERROR);
94     NSSetMsgHandleThreadHandle(handle);
95
96     NS_LOG(DEBUG, "create queue");
97     queue = NSCreateQueue();
98     NS_VERIFY_NOT_NULL(queue, NS_ERROR);
99     NSSetMsgHandleQueue(queue);
100
101     return NS_OK;
102 }
103
104 NSResult NSConsumerPushEvent(NSTask * task)
105 {
106     NSConsumerThread * thread = NSThreadInit(NSConsumerMsgPushThreadFunc, (void *) task);
107     NS_VERIFY_NOT_NULL(thread, NS_ERROR);
108
109     return NS_OK;
110 }
111
112 void NSConsumerMessageHandlerExit()
113 {
114     NSDestroyMessageCacheList();
115     NSConsumerListenerTermiate();
116     NSThreadStop(*(NSGetMsgHandleThreadHandle()));
117     NSDestroyQueue(*(NSGetMsgHandleQueue()));
118 }
119
120 void * NSConsumerMsgHandleThreadFunc(void * threadHandle)
121 {
122     NSConsumerQueue * queue = NULL;
123     NSConsumerQueueObject * obj = NULL;
124
125     NS_LOG(DEBUG, "create thread for consumer message handle");
126     NSConsumerThread * queueHandleThread = (NSConsumerThread *) threadHandle;
127     NS_VERIFY_NOT_NULL(queueHandleThread, NULL);
128
129     while (true)
130     {
131         if (!queueHandleThread->isStarted)
132         {
133             NS_LOG(ERROR, "msg handler thread will be terminated");
134             break;
135         }
136
137         queue = *(NSGetMsgHandleQueue());
138         if (!queue)
139         {
140             continue;
141         }
142
143         if (NSIsQueueEmpty(queue))
144         {
145             usleep(2000);
146             continue;
147         }
148
149         NSThreadLock(queueHandleThread);
150         NS_LOG(DEBUG, "msg handler working");
151         obj = NSPopQueue(queue);
152
153         if (obj)
154         {
155             NSConsumerTaskProcessing((NSTask *)(obj->data));
156         }
157
158         NSThreadUnlock(queueHandleThread);
159
160     }
161
162     return NULL;
163 }
164
165 void * NSConsumerMsgPushThreadFunc(void * data)
166 {
167     NSConsumerQueueObject * obj = NULL;
168     NSConsumerQueue * queue = NULL;
169
170     NS_LOG(DEBUG, "get queueThread handle");
171     NSConsumerThread * msgHandleThread = *(NSGetMsgHandleThreadHandle());
172     NS_VERIFY_NOT_NULL(msgHandleThread, NULL);
173
174     NS_LOG(DEBUG, "create queue object");
175     obj = (NSConsumerQueueObject *)OICMalloc(sizeof(NSConsumerQueueObject));
176     NS_VERIFY_NOT_NULL(obj, NULL);
177
178     obj->data = data;
179     obj->next = NULL;
180
181     NSThreadLock(msgHandleThread);
182
183     queue = *(NSGetMsgHandleQueue());
184     if (!queue)
185     {
186         NS_LOG(ERROR, "NSQueue is null. can not insert to queue");
187         OICFree(data);
188         OICFree(obj);
189     }
190     else
191     {
192         NSPushQueue(queue, obj);
193     }
194
195     NSThreadUnlock(msgHandleThread);
196
197     return NULL;
198 }
199
200 void NSConsumerTaskProcessing(NSTask * task)
201 {
202     switch (task->taskType)
203     {
204     case TASK_EVENT_CONNECTED:
205     case TASK_CONSUMER_REQ_DISCOVER:
206     {
207         NSConsumerDiscoveryTaskProcessing(task);
208         break;
209     }
210     case TASK_CONSUMER_REQ_SUBSCRIBE:
211     case TASK_CONSUMER_REQ_SUBSCRIBE_CANCEL:
212     case TASK_SEND_SYNCINFO:
213     {
214         NSConsumerCommunicationTaskProcessing(task);
215         break;
216     }
217     case TASK_RECV_SYNCINFO:
218     case TASK_CONSUMER_RECV_MESSAGE:
219     case TASK_CONSUMER_PROVIDER_DISCOVERED:
220     case TASK_CONSUMER_RECV_SUBSCRIBE_CONFIRMED:
221     case TASK_MAKE_SYNCINFO:
222     {
223         NSConsumerInternalTaskProcessing(task);
224         break;
225     }
226     default:
227         NS_LOG(ERROR, "Unknown type of task");
228         break;
229     }
230 }