[IOT-1464] Set NULL after deleting dataQueue of queueing thread
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include "iotivity_config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif
28 #ifdef HAVE_SYS_TYPES_H
29 #include <sys/types.h>
30 #endif
31
32 #include "caqueueingthread.h"
33 #include "oic_malloc.h"
34 #include "logger.h"
35
36 #define TAG PCF("OIC_CA_QING")
37
38 static void CAQueueingThreadBaseRoutine(void *threadValue)
39 {
40     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
41
42     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
43
44     if (NULL == thread)
45     {
46         OIC_LOG(ERROR, TAG, "thread data passing error!!");
47         return;
48     }
49
50     while (!thread->isStop)
51     {
52         // mutex lock
53         ca_mutex_lock(thread->threadMutex);
54
55         // if queue is empty, thread will wait
56         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
57         {
58             OIC_LOG(DEBUG, TAG, "wait..");
59
60             // wait
61             ca_cond_wait(thread->threadCond, thread->threadMutex);
62
63             OIC_LOG(DEBUG, TAG, "wake up..");
64         }
65
66         // check stop flag
67         if (thread->isStop)
68         {
69             // mutex unlock
70             ca_mutex_unlock(thread->threadMutex);
71             continue;
72         }
73
74         // get data
75         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
76         // mutex unlock
77         ca_mutex_unlock(thread->threadMutex);
78         if (NULL == message)
79         {
80             continue;
81         }
82
83         // process data
84         thread->threadTask(message->msg);
85
86         // free
87         if (NULL != thread->destroy)
88         {
89             thread->destroy(message->msg, message->size);
90         }
91         else
92         {
93             OICFree(message->msg);
94         }
95
96         OICFree(message);
97     }
98
99     ca_mutex_lock(thread->threadMutex);
100     ca_cond_signal(thread->threadCond);
101     ca_mutex_unlock(thread->threadMutex);
102
103     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
104 }
105
106 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
107                                       CAThreadTask task, CADataDestroyFunction destroy)
108 {
109     if (NULL == thread)
110     {
111         OIC_LOG(ERROR, TAG, "thread instance is empty..");
112         return CA_STATUS_INVALID_PARAM;
113     }
114
115     if (NULL == handle)
116     {
117         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
118         return CA_STATUS_INVALID_PARAM;
119     }
120
121     OIC_LOG(DEBUG, TAG, "thread initialize..");
122
123     // set send thread data
124     thread->threadPool = handle;
125     thread->dataQueue = u_queue_create();
126     thread->threadMutex = ca_mutex_new();
127     thread->threadCond = ca_cond_new();
128     thread->isStop = true;
129     thread->threadTask = task;
130     thread->destroy = destroy;
131     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
132     {
133         goto ERROR_MEM_FAILURE;
134     }
135
136     return CA_STATUS_OK;
137
138 ERROR_MEM_FAILURE:
139     if (thread->dataQueue)
140     {
141         u_queue_delete(thread->dataQueue);
142         thread->dataQueue = NULL;
143     }
144     if (thread->threadMutex)
145     {
146         ca_mutex_free(thread->threadMutex);
147         thread->threadMutex = NULL;
148     }
149     if (thread->threadCond)
150     {
151         ca_cond_free(thread->threadCond);
152         thread->threadCond = NULL;
153     }
154     return CA_MEMORY_ALLOC_FAILED;
155 }
156
157 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
158 {
159     if (NULL == thread)
160     {
161         OIC_LOG(ERROR, TAG, "thread instance is empty..");
162         return CA_STATUS_INVALID_PARAM;
163     }
164
165     if (NULL == thread->threadPool)
166     {
167         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
168         return CA_STATUS_INVALID_PARAM;
169     }
170
171     if (false == thread->isStop) //Queueing thread already running
172     {
173         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
174         return CA_STATUS_OK;
175     }
176
177     // mutex lock
178     ca_mutex_lock(thread->threadMutex);
179     thread->isStop = false;
180     // mutex unlock
181     ca_mutex_unlock(thread->threadMutex);
182
183     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
184                                             thread);
185     if (res != CA_STATUS_OK)
186     {
187         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
188     }
189
190     return res;
191 }
192
193 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
194 {
195     if (NULL == thread)
196     {
197         OIC_LOG(ERROR, TAG, "thread instance is empty..");
198         return CA_STATUS_INVALID_PARAM;
199     }
200
201     if (NULL == data || 0 == size)
202     {
203         OIC_LOG(ERROR, TAG, "data is empty..");
204
205         return CA_STATUS_INVALID_PARAM;
206     }
207
208     // create thread data
209     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
210
211     if (NULL == message)
212     {
213         OIC_LOG(ERROR, TAG, "memory error!!");
214         return CA_MEMORY_ALLOC_FAILED;
215     }
216
217     message->msg = data;
218     message->size = size;
219
220     // mutex lock
221     ca_mutex_lock(thread->threadMutex);
222
223     // add thread data into list
224     u_queue_add_element(thread->dataQueue, message);
225
226     // notity the thread
227     ca_cond_signal(thread->threadCond);
228
229     // mutex unlock
230     ca_mutex_unlock(thread->threadMutex);
231
232     return CA_STATUS_OK;
233 }
234
235 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
236 {
237     if (NULL == thread)
238     {
239         OIC_LOG(ERROR, TAG, "thread instance is empty..");
240         return CA_STATUS_INVALID_PARAM;
241     }
242
243     OIC_LOG(DEBUG, TAG, "thread destroy..");
244
245     ca_mutex_free(thread->threadMutex);
246     thread->threadMutex = NULL;
247     ca_cond_free(thread->threadCond);
248
249     // remove all remained list data.
250     while (u_queue_get_size(thread->dataQueue) > 0)
251     {
252         // get data
253         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
254
255         // free
256         if (NULL != message)
257         {
258             if (NULL != thread->destroy)
259             {
260                 thread->destroy(message->msg, message->size);
261             }
262             else
263             {
264                 OICFree(message->msg);
265             }
266
267             OICFree(message);
268         }
269     }
270
271     u_queue_delete(thread->dataQueue);
272     thread->dataQueue = NULL;
273
274     return CA_STATUS_OK;
275 }
276
277 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
278 {
279     if (NULL == thread)
280     {
281         OIC_LOG(ERROR, TAG, "thread instance is empty..");
282         return CA_STATUS_INVALID_PARAM;
283     }
284
285     OIC_LOG(DEBUG, TAG, "thread stop request!!");
286
287     if (!thread->isStop)
288     {
289         // mutex lock
290         ca_mutex_lock(thread->threadMutex);
291
292         // set stop flag
293         thread->isStop = true;
294
295         // notify the thread
296         ca_cond_signal(thread->threadCond);
297
298         ca_cond_wait(thread->threadCond, thread->threadMutex);
299
300         // mutex unlock
301         ca_mutex_unlock(thread->threadMutex);
302     }
303
304     return CA_STATUS_OK;
305 }