Avoid adding elements to QueueingThread if it's already stopped.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caqueueingthread.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #ifdef __TIZENRT__
22 #include <tinyara/config.h>
23 #endif
24
25 #include "iotivity_config.h"
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_TYPES_H
33 #include <sys/types.h>
34 #endif
35
36 #include "caqueueingthread.h"
37 #include "camessagehandler.h"
38 #include "oic_malloc.h"
39 #include "logger.h"
40
41 #define TAG PCF("OIC_CA_QING")
42
43 static void CAQueueingThreadBaseRoutine(void *threadValue)
44 {
45     OIC_LOG(DEBUG, TAG, "message handler main thread start..");
46
47     CAQueueingThread_t *thread = (CAQueueingThread_t *) threadValue;
48
49     if (NULL == thread)
50     {
51         OIC_LOG(ERROR, TAG, "thread data passing error!!");
52         return;
53     }
54
55     while (!thread->isStop)
56     {
57         // mutex lock
58         oc_mutex_lock(thread->threadMutex);
59
60         // if queue is empty, thread will wait
61         if (!thread->isStop && u_queue_get_size(thread->dataQueue) <= 0)
62         {
63             OIC_LOG(DEBUG, TAG, "wait..");
64
65             // wait
66             oc_cond_wait(thread->threadCond, thread->threadMutex);
67
68             OIC_LOG(DEBUG, TAG, "wake up..");
69         }
70
71         // check stop flag
72         if (thread->isStop)
73         {
74             // mutex unlock
75             oc_mutex_unlock(thread->threadMutex);
76             continue;
77         }
78
79         // get data
80         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
81         // mutex unlock
82         oc_mutex_unlock(thread->threadMutex);
83         if (NULL == message)
84         {
85             continue;
86         }
87
88         // process data
89         thread->threadTask(message->msg);
90
91         // free
92         if (NULL != thread->destroy)
93         {
94             thread->destroy(message->msg, message->size);
95         }
96         else
97         {
98             OICFree(message->msg);
99         }
100
101         OICFree(message);
102     }
103
104     oc_mutex_lock(thread->threadMutex);
105     oc_cond_signal(thread->threadCond);
106     oc_mutex_unlock(thread->threadMutex);
107
108     OIC_LOG(DEBUG, TAG, "message handler main thread end..");
109 }
110
111 CAResult_t CAQueueingThreadInitialize(CAQueueingThread_t *thread, ca_thread_pool_t handle,
112                                       CAThreadTask task, CADataDestroyFunction destroy)
113 {
114     if (NULL == thread)
115     {
116         OIC_LOG(ERROR, TAG, "thread instance is empty..");
117         return CA_STATUS_INVALID_PARAM;
118     }
119
120     if (NULL == handle)
121     {
122         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
123         return CA_STATUS_INVALID_PARAM;
124     }
125
126     OIC_LOG(DEBUG, TAG, "thread initialize..");
127
128     // set send thread data
129     thread->threadPool = handle;
130     thread->dataQueue = u_queue_create();
131     thread->threadMutex = oc_mutex_new();
132     thread->threadCond = oc_cond_new();
133     thread->isStop = true;
134     thread->threadTask = task;
135     thread->destroy = destroy;
136     if (NULL == thread->dataQueue || NULL == thread->threadMutex || NULL == thread->threadCond)
137     {
138         goto ERROR_MEM_FAILURE;
139     }
140
141     return CA_STATUS_OK;
142
143 ERROR_MEM_FAILURE:
144     if (thread->dataQueue)
145     {
146         u_queue_delete(thread->dataQueue);
147         thread->dataQueue = NULL;
148     }
149     if (thread->threadMutex)
150     {
151         oc_mutex_free(thread->threadMutex);
152         thread->threadMutex = NULL;
153     }
154     if (thread->threadCond)
155     {
156         oc_cond_free(thread->threadCond);
157         thread->threadCond = NULL;
158     }
159     return CA_MEMORY_ALLOC_FAILED;
160 }
161 #ifndef __TIZENRT__
162 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread)
163 #else
164 CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_name)
165 #endif
166 {
167     if (NULL == thread)
168     {
169         OIC_LOG(ERROR, TAG, "thread instance is empty..");
170         return CA_STATUS_INVALID_PARAM;
171     }
172
173     if (NULL == thread->threadPool)
174     {
175         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
176         return CA_STATUS_INVALID_PARAM;
177     }
178
179     if (false == thread->isStop) //Queueing thread already running
180     {
181         OIC_LOG(DEBUG, TAG, "queueing thread already running..");
182         return CA_STATUS_OK;
183     }
184
185     // mutex lock
186     oc_mutex_lock(thread->threadMutex);
187     thread->isStop = false;
188     // mutex unlock
189     oc_mutex_unlock(thread->threadMutex);
190 #ifndef __TIZENRT__
191     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
192                                              thread, NULL);
193 #else
194     CAResult_t res = ca_thread_pool_add_task(thread->threadPool, CAQueueingThreadBaseRoutine,
195                                              thread, NULL, thread_name,
196                                              CONFIG_IOTIVITY_QUEING_PTHREAD_STACKSIZE);
197 #endif
198     if (res != CA_STATUS_OK)
199     {
200         // update thread status.
201         oc_mutex_lock(thread->threadMutex);
202         thread->isStop = true;
203         oc_mutex_unlock(thread->threadMutex);
204
205         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
206     }
207
208     return res;
209 }
210
211 CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
212 {
213     if (NULL == thread)
214     {
215         OIC_LOG(ERROR, TAG, "thread instance is empty..");
216         return CA_STATUS_INVALID_PARAM;
217     }
218
219     if (NULL == data || 0 == size)
220     {
221         OIC_LOG(ERROR, TAG, "data is empty..");
222         return CA_STATUS_INVALID_PARAM;
223     }
224
225     // create thread data
226     u_queue_message_t *message = (u_queue_message_t *) OICMalloc(sizeof(u_queue_message_t));
227
228     if (NULL == message)
229     {
230         OIC_LOG(ERROR, TAG, "memory error!!");
231         return CA_MEMORY_ALLOC_FAILED;
232     }
233
234     message->msg = data;
235     message->size = size;
236
237     // mutex lock
238     oc_mutex_lock(thread->threadMutex);
239
240     // thread stop
241     if (thread->isStop)
242     {
243         // mutex unlock
244         oc_mutex_unlock(thread->threadMutex);
245
246         OICFree(message);
247         return CA_STATUS_FAILED;
248     }
249
250     // add thread data into list
251     u_queue_add_element(thread->dataQueue, message);
252
253     // notity the thread
254     oc_cond_signal(thread->threadCond);
255
256     // mutex unlock
257     oc_mutex_unlock(thread->threadMutex);
258
259     return CA_STATUS_OK;
260 }
261
262 CAResult_t CAQueueingThreadClearData(CAQueueingThread_t *thread)
263 {
264     if (NULL == thread)
265     {
266         OIC_LOG(ERROR, TAG, "thread instance is empty..");
267         return CA_STATUS_INVALID_PARAM;
268     }
269
270     OIC_LOG(DEBUG, TAG, "clear queue data..");
271
272     // mutex lock
273     oc_mutex_lock(thread->threadMutex);
274
275     // remove all remained list data.
276     while (u_queue_get_size(thread->dataQueue) > 0)
277     {
278         // get data
279         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
280
281         // free
282         if (NULL != message)
283         {
284             if (NULL != thread->destroy)
285             {
286                 thread->destroy(message->msg, message->size);
287             }
288             else
289             {
290                 OICFree(message->msg);
291             }
292
293             OICFree(message);
294         }
295     }
296
297     // mutex unlock
298     oc_mutex_unlock(thread->threadMutex);
299
300     return CA_STATUS_OK;
301 }
302
303 CAResult_t CAQueueingThreadClearContextData(CAQueueingThread_t *thread,
304                                             CAContextDataDestroy callback, void *ctx)
305 {
306     if (NULL == thread)
307     {
308         OIC_LOG(ERROR, TAG, "thread instance is empty..");
309         return CA_STATUS_INVALID_PARAM;
310     }
311
312     if (NULL == callback)
313     {
314         OIC_LOG(ERROR, TAG, "callback is NULL..");
315         return CA_STATUS_INVALID_PARAM;
316     }
317
318     if (NULL == ctx)
319     {
320         OIC_LOG(ERROR, TAG, "ctx is NULL..");
321         return CA_STATUS_INVALID_PARAM;
322     }
323
324     OIC_LOG(DEBUG, TAG, "Clear thread data according to context");
325
326     // mutex lock
327     oc_mutex_lock(thread->threadMutex);
328
329     // remove adapter related list data.
330     u_queue_remove_req_elements(thread->dataQueue, callback, ctx, thread->destroy);
331
332     // mutex unlock
333     oc_mutex_unlock(thread->threadMutex);
334
335     return CA_STATUS_OK;
336 }
337
338 CAResult_t CAQueueingThreadDestroy(CAQueueingThread_t *thread)
339 {
340     if (NULL == thread)
341     {
342         OIC_LOG(ERROR, TAG, "thread instance is empty..");
343         return CA_STATUS_INVALID_PARAM;
344     }
345
346     OIC_LOG(DEBUG, TAG, "thread destroy..");
347
348     // mutex lock
349     oc_mutex_lock(thread->threadMutex);
350
351     // remove all remained list data.
352     while (u_queue_get_size(thread->dataQueue) > 0)
353     {
354         // get data
355         u_queue_message_t *message = u_queue_get_element(thread->dataQueue);
356
357         // free
358         if (NULL != message)
359         {
360             if (NULL != thread->destroy)
361             {
362                 thread->destroy(message->msg, message->size);
363             }
364             else
365             {
366                 OICFree(message->msg);
367             }
368
369             OICFree(message);
370         }
371     }
372
373     // mutex unlock
374     oc_mutex_unlock(thread->threadMutex);
375
376     oc_mutex_free(thread->threadMutex);
377     thread->threadMutex = NULL;
378     oc_cond_free(thread->threadCond);
379     thread->threadCond = NULL;
380
381     u_queue_delete(thread->dataQueue);
382     thread->dataQueue = NULL;
383
384     return CA_STATUS_OK;
385 }
386
387 CAResult_t CAQueueingThreadStop(CAQueueingThread_t *thread)
388 {
389     if (NULL == thread)
390     {
391         OIC_LOG(ERROR, TAG, "thread instance is empty..");
392         return CA_STATUS_INVALID_PARAM;
393     }
394
395     OIC_LOG(DEBUG, TAG, "thread stop request!!");
396
397     if (!thread->isStop)
398     {
399         // mutex lock
400         oc_mutex_lock(thread->threadMutex);
401
402         // set stop flag
403         thread->isStop = true;
404
405         // notify the thread
406         oc_cond_signal(thread->threadCond);
407
408         oc_cond_wait(thread->threadCond, thread->threadMutex);
409
410         // mutex unlock
411         oc_mutex_unlock(thread->threadMutex);
412     }
413
414     return CA_STATUS_OK;
415 }